From 40987e13fda95ae21cd2cd887c2a709db74fadd9 Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 9 Jul 2020 16:39:26 +0300 Subject: [PATCH] Import in v0.10 Concurrent mechanism --- apier/v1/concreqs_it_test.go | 297 ++++++++++++++++++ apier/v1/core.go | 15 +- apier/v1/sessionsbirpc.go | 143 +++++++-- apier/v1/smgbirpc.go | 31 +- cmd/cgr-engine/cgr-engine.go | 3 +- config/config_defaults.go | 2 + config/config_json_test.go | 2 + config/config_test.go | 4 + config/generalcfg.go | 106 ++++--- config/generalcfg_test.go | 48 +-- config/libconfig_json.go | 2 + data/conf/samples/conc_reqs_busy/cgrates.json | 42 +++ .../conf/samples/conc_reqs_queue/cgrates.json | 41 +++ utils/concureqs.go | 69 ++++ utils/concureqs_gob_codec.go | 96 ++++++ utils/concureqs_json_codec.go | 133 ++++++++ utils/consts.go | 54 ++-- utils/server.go | 9 +- 18 files changed, 972 insertions(+), 125 deletions(-) create mode 100644 apier/v1/concreqs_it_test.go create mode 100644 data/conf/samples/conc_reqs_busy/cgrates.json create mode 100644 data/conf/samples/conc_reqs_queue/cgrates.json create mode 100644 utils/concureqs.go create mode 100644 utils/concureqs_gob_codec.go create mode 100644 utils/concureqs_json_codec.go diff --git a/apier/v1/concreqs_it_test.go b/apier/v1/concreqs_it_test.go new file mode 100644 index 000000000..6ecff5add --- /dev/null +++ b/apier/v1/concreqs_it_test.go @@ -0,0 +1,297 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/rpc" + "path" + "strings" + "sync" + "testing" + "time" + + "github.com/cenkalti/rpc2" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + concReqsCfgPath string + concReqsCfg *config.CGRConfig + concReqsRPC *rpc.Client + concReqsBiRPC *rpc2.Client + concReqsConfigDIR string //run tests for specific configuration + + sTestsConcReqs = []func(t *testing.T){ + testConcReqsInitCfg, + testConcReqsStartEngine, + testConcReqsRPCConn, + testConcReqsBusyAPIs, + testConcReqsQueueAPIs, + testConcReqsOnHTTPBusy, + testConcReqsOnHTTPQueue, + testConcReqsOnBiJSONBusy, + testConcReqsOnBiJSONQueue, + testConcReqsKillEngine, + } +) + +//Test start here +func TestConcReqsBusyJSON(t *testing.T) { + concReqsConfigDIR = "conc_reqs_busy" + for _, stest := range sTestsConcReqs { + t.Run(concReqsConfigDIR, stest) + } +} + +func TestConcReqsQueueJSON(t *testing.T) { + concReqsConfigDIR = "conc_reqs_queue" + for _, stest := range sTestsConcReqs { + t.Run(concReqsConfigDIR, stest) + } +} + +func TestConcReqsBusyGOB(t *testing.T) { + concReqsConfigDIR = "conc_reqs_busy" + encoding = utils.StringPointer(utils.MetaGOB) + for _, stest := range sTestsConcReqs { + t.Run(concReqsConfigDIR, stest) + } +} + +func TestConcReqsQueueGOB(t *testing.T) { + concReqsConfigDIR = "conc_reqs_queue" + encoding = utils.StringPointer(utils.MetaGOB) + for _, stest := range sTestsConcReqs { + t.Run(concReqsConfigDIR, stest) + } +} + +func testConcReqsInitCfg(t *testing.T) { + var err error + concReqsCfgPath = path.Join(*dataDir, "conf", "samples", concReqsConfigDIR) + concReqsCfg, err = config.NewCGRConfigFromPath(concReqsCfgPath) + if err != nil { + t.Error(err) + } + concReqsCfg.DataFolderPath = *dataDir + config.SetCgrConfig(concReqsCfg) +} + +// Start CGR Engine +func testConcReqsStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(concReqsCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func handlePing(clnt *rpc2.Client, arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) + *reply = utils.OK + return nil +} + +// Connect rpc client to rater +func testConcReqsRPCConn(t *testing.T) { + var err error + concReqsRPC, err = newRPCClient(concReqsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } + if concReqsBiRPC, err = utils.NewBiJSONrpcClient(concReqsCfg.SessionSCfg().ListenBijson, + nil); err != nil { + t.Fatal(err) + } +} + +func testConcReqsBusyAPIs(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var failedAPIs int + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsRPC.Call(utils.CoreSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + lock.Lock() + failedAPIs++ + lock.Unlock() + wg.Done() + return + } + wg.Done() + }() + } + wg.Wait() + if failedAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsQueueAPIs(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsRPC.Call(utils.CoreSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + }() + } + wg.Wait() +} + +func testConcReqsOnHTTPBusy(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var fldAPIs int64 + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + resp, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + wg.Done() + t.Error(err) + return + } + resp.Body.Close() + if strings.Contains(string(contents), "denying request due to maximum active requests reached") { + lock.Lock() + fldAPIs++ + lock.Unlock() + } + wg.Done() + return + }(i) + } + wg.Wait() + if fldAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsOnHTTPQueue(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func(index int) { + _, err := http.Post("http://localhost:2080/jsonrpc", "application/json", bytes.NewBuffer([]byte(fmt.Sprintf(`{"method": "CoreSv1.Sleep", "params": [{"DurationTime":10000000}], "id":%d}`, index)))) + if err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + return + }(i) + } + wg.Wait() +} + +func testConcReqsOnBiJSONBusy(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_busy" { + t.SkipNow() + } + var failedAPIs int + wg := new(sync.WaitGroup) + lock := new(sync.Mutex) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + lock.Lock() + failedAPIs++ + lock.Unlock() + wg.Done() + return + } + wg.Done() + }() + } + wg.Wait() + if failedAPIs < 2 { + t.Errorf("Expected at leat 2 APIs to wait") + } +} + +func testConcReqsOnBiJSONQueue(t *testing.T) { + if concReqsConfigDIR != "conc_reqs_queue" { + t.SkipNow() + } + wg := new(sync.WaitGroup) + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + var resp string + if err := concReqsBiRPC.Call(utils.SessionSv1Sleep, + &DurationArgs{DurationTime: time.Duration(10 * time.Millisecond)}, + &resp); err != nil { + wg.Done() + t.Error(err) + return + } + wg.Done() + }() + } + wg.Wait() +} + +func testConcReqsKillEngine(t *testing.T) { + time.Sleep(100 * time.Millisecond) + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/core.go b/apier/v1/core.go index b9dba1b8e..88cbbe096 100644 --- a/apier/v1/core.go +++ b/apier/v1/core.go @@ -19,6 +19,8 @@ along with this program. If not, see package v1 import ( + "time" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -42,8 +44,19 @@ func (cS *CoreSv1) Status(arg *utils.TenantWithArgDispatcher, reply *map[string] return cS.cS.Status(arg, reply) } -// Ping used to detreminate if component is active +// Ping used to determinate if component is active func (cS *CoreSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error { *reply = utils.Pong return nil } + +type DurationArgs struct { + DurationTime time.Duration +} + +// Sleep is used to test the concurrent requests mechanism +func (cS *CoreSv1) Sleep(arg *DurationArgs, reply *string) error { + time.Sleep(arg.DurationTime) + *reply = utils.OK + return nil +} diff --git a/apier/v1/sessionsbirpc.go b/apier/v1/sessionsbirpc.go index d85a446a0..4e41da59b 100644 --- a/apier/v1/sessionsbirpc.go +++ b/apier/v1/sessionsbirpc.go @@ -19,6 +19,8 @@ along with this program. If not, see package v1 import ( + "time" + "github.com/cenkalti/rpc2" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -51,108 +53,207 @@ func (ssv1 *SessionSv1) Handlers() map[string]interface{} { utils.SessionSv1SetPassiveSession: ssv1.BiRPCv1SetPassiveSession, utils.SessionSv1ActivateSessions: ssv1.BiRPCv1ActivateSessions, utils.SessionSv1DeactivateSessions: ssv1.BiRPCv1DeactivateSessions, + + utils.SessionSv1Sleep: ssv1.BiRPCV1Sleep, // Sleep method is used to test the concurrent requests mechanism } } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEvent(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReply) error { + rply *sessions.V1AuthorizeReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1AuthorizeEventWithDigest(clnt *rpc2.Client, args *sessions.V1AuthorizeArgs, - rply *sessions.V1AuthorizeReplyWithDigest) error { + rply *sessions.V1AuthorizeReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1AuthorizeEventWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSession(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitSessionReply) error { + rply *sessions.V1InitSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1InitiateSessionWithDigest(clnt *rpc2.Client, args *sessions.V1InitSessionArgs, - rply *sessions.V1InitReplyWithDigest) error { + rply *sessions.V1InitReplyWithDigest) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1InitiateSessionWithDigest(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1UpdateSession(clnt *rpc2.Client, args *sessions.V1UpdateSessionArgs, - rply *sessions.V1UpdateSessionReply) error { + rply *sessions.V1UpdateSessionReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1UpdateSession(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1SyncSessions(clnt *rpc2.Client, args *string, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SyncSessions(clnt, "", rply) } func (ssv1 *SessionSv1) BiRPCv1TerminateSession(clnt *rpc2.Client, args *sessions.V1TerminateSessionArgs, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1TerminateSession(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, rply *string) error { +func (ssv1 *SessionSv1) BiRPCv1ProcessCDR(clnt *rpc2.Client, cgrEv *utils.CGREventWithArgDispatcher, + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessCDR(clnt, cgrEv, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessMessage(clnt *rpc2.Client, args *sessions.V1ProcessMessageArgs, - rply *sessions.V1ProcessMessageReply) error { + rply *sessions.V1ProcessMessageReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessMessage(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ProcessEvent(clnt *rpc2.Client, args *sessions.V1ProcessEventArgs, - rply *sessions.V1ProcessEventReply) error { + rply *sessions.V1ProcessEventReply) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ProcessEvent(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetActiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetActiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessions(clnt *rpc2.Client, args *utils.SessionFilter, - rply *[]*sessions.ExternalSession) error { + rply *[]*sessions.ExternalSession) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessions(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1GetPassiveSessionsCount(clnt *rpc2.Client, args *utils.SessionFilter, - rply *int) error { + rply *int) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1ForceDisconnect(clnt *rpc2.Client, args *utils.SessionFilter, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ForceDisconnect(clnt, args, rply) } func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, args string, - rply *string) error { + rply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply) } -func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, reply *string) error { +func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ping(ign, reply) } func (ssv1 *SessionSv1) BiRPCv1ReplicateSessions(clnt *rpc2.Client, - args sessions.ArgsReplicateSessions, reply *string) error { + args sessions.ArgsReplicateSessions, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.BiRPCv1ReplicateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1SetPassiveSession(clnt *rpc2.Client, - args *sessions.Session, reply *string) error { + args *sessions.Session, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1SetPassiveSession(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1ActivateSessions(clnt *rpc2.Client, - args []string, reply *string) error { + args []string, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1ActivateSessions(clnt, args, reply) } func (ssv1 *SessionSv1) BiRPCv1DeactivateSessions(clnt *rpc2.Client, - args []string, reply *string) error { + args []string, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return ssv1.Ss.BiRPCv1DeactivateSessions(clnt, args, reply) } + +func (ssv1 *SessionSv1) BiRPCV1Sleep(clnt *rpc2.Client, arg *DurationArgs, + reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() + time.Sleep(arg.DurationTime) + *reply = utils.OK + return nil +} diff --git a/apier/v1/smgbirpc.go b/apier/v1/smgbirpc.go index 7cd2ed611..f2824d898 100644 --- a/apier/v1/smgbirpc.go +++ b/apier/v1/smgbirpc.go @@ -20,6 +20,7 @@ package v1 import ( "github.com/cenkalti/rpc2" + "github.com/cgrates/cgrates/utils" ) // Publishes methods exported by SMGenericV1 as SMGenericV1 (so we can handle standard RPC methods via birpc socket) @@ -35,30 +36,50 @@ func (smgv1 *SMGenericV1) Handlers() map[string]interface{} { /// Returns MaxUsage (for calls in seconds), -1 for no limit func (smgv1 *SMGenericV1) BiRPCV1GetMaxUsage(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1GetMaxUsage(clnt, ev, maxUsage) } // Called on session start, returns the maximum number of seconds the session can last func (smgv1 *SMGenericV1) BiRPCV1InitiateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1InitiateSession(clnt, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (smgv1 *SMGenericV1) BiRPCV1UpdateSession(clnt *rpc2.Client, - ev map[string]interface{}, maxUsage *float64) error { + ev map[string]interface{}, maxUsage *float64) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1UpdateSession(clnt, ev, maxUsage) } // Called on session end, should stop debit loop func (smgv1 *SMGenericV1) BiRPCV1TerminateSession(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1TerminateSession(clnt, ev, reply) } // Called on session end, should send the CDR to CDRS func (smgv1 *SMGenericV1) BiRPCV1ProcessCDR(clnt *rpc2.Client, - ev map[string]interface{}, reply *string) error { + ev map[string]interface{}, reply *string) (err error) { + if err = utils.ConReqs.Allocate(); err != nil { + return + } + defer utils.ConReqs.Deallocate() return smgv1.Ss.BiRPCV1ProcessCDR(clnt, ev, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ca4217f8b..0ae2ef627 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -389,7 +389,8 @@ func main() { lgLevel = *logLevel } utils.Logger.SetLogLevel(lgLevel) - + // init the concurrentRequests + utils.ConReqs = utils.NewConReqs(cfg.GeneralCfg().ConcurrentRequests, cfg.GeneralCfg().ConcurrentStrategy) utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, goVers)) cfg.LazySanityCheck() diff --git a/config/config_defaults.go b/config/config_defaults.go index cfed98566..7160733d4 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -52,6 +52,8 @@ const CGRATES_CFG_JSON = ` "digest_equal": ":", // equal symbol used in case of digests "rsr_separator": ";", // separator used within RSR fields "max_parralel_conns": 100, // the maximum number of connection used by the *parallel strategy + "concurrent_requests": 0, // number of active concurrent requests + "concurrent_strategy": "*busy", // strategy for limit active concurrent requests }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 169d8f332..e04860da4 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -62,6 +62,8 @@ func TestDfGeneralJsonCfg(t *testing.T) { Digest_equal: utils.StringPointer(":"), Rsr_separator: utils.StringPointer(";"), Max_parralel_conns: utils.IntPointer(100), + Concurrent_requests: utils.IntPointer(0), + Concurrent_strategy: utils.StringPointer(utils.MetaBusy), } if gCfg, err := dfCgrJsonCfg.GeneralJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index c114c3126..6ba943475 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1898,6 +1898,8 @@ func TestGeneralCfg(t *testing.T) { "digest_equal": ":", "rsr_separator": ";", "max_parralel_conns": 100, + "concurrent_requests": 0, + "concurrent_strategy": "", }, }` eMap := map[string]interface{}{ @@ -1925,6 +1927,8 @@ func TestGeneralCfg(t *testing.T) { "digest_equal": ":", "rsr_separator": ";", "max_parralel_conns": 100, + "concurrent_requests": 0, + "concurrent_strategy": "", } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { t.Error(err) diff --git a/config/generalcfg.go b/config/generalcfg.go index 80b49a38d..9429a6783 100644 --- a/config/generalcfg.go +++ b/config/generalcfg.go @@ -27,30 +27,32 @@ import ( // General config section type GeneralCfg struct { - NodeID string // Identifier for this engine instance - Logger string // dictates the way logs are displayed/stored - LogLevel int // system wide log level, nothing higher than this will be logged - HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate - RoundingDecimals int // Number of decimals to round end prices at - DBDataEncoding string // The encoding used to store object data in strings: - TpExportPath string // Path towards export folder for offline Tariff Plans - PosterAttempts int // Time to wait before writing the failed posts in a single file - FailedPostsDir string // Directory path where we store failed http requests - FailedPostsTTL time.Duration // Directory path where we store failed http requests - DefaultReqType string // Use this request type if not defined on top - DefaultCategory string // set default type of record - DefaultTenant string // set default tenant - DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - DefaultCaching string - ConnectAttempts int // number of initial connection attempts before giving up - Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> - ConnectTimeout time.Duration // timeout for RPC connection attempts - ReplyTimeout time.Duration // timeout replies if not reaching back - LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks - DigestSeparator string // - DigestEqual string // - RSRSep string // separator used to split RSRParser (by degault is used ";") - MaxParralelConns int // the maximum number of connection used by the *parallel strategy + NodeID string // Identifier for this engine instance + Logger string // dictates the way logs are displayed/stored + LogLevel int // system wide log level, nothing higher than this will be logged + HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate + RoundingDecimals int // Number of decimals to round end prices at + DBDataEncoding string // The encoding used to store object data in strings: + TpExportPath string // Path towards export folder for offline Tariff Plans + PosterAttempts int // Time to wait before writing the failed posts in a single file + FailedPostsDir string // Directory path where we store failed http requests + FailedPostsTTL time.Duration // Directory path where we store failed http requests + DefaultReqType string // Use this request type if not defined on top + DefaultCategory string // set default type of record + DefaultTenant string // set default tenant + DefaultTimezone string // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + DefaultCaching string + ConnectAttempts int // number of initial connection attempts before giving up + Reconnects int // number of recconect attempts in case of connection lost <-1 for infinite | nb> + ConnectTimeout time.Duration // timeout for RPC connection attempts + ReplyTimeout time.Duration // timeout replies if not reaching back + LockingTimeout time.Duration // locking mechanism timeout to avoid deadlocks + DigestSeparator string // + DigestEqual string // + RSRSep string // separator used to split RSRParser (by degault is used ";") + MaxParralelConns int // the maximum number of connection used by the *parallel strategy + ConcurrentRequests int + ConcurrentStrategy string } //loadFromJsonCfg loads General config from JsonCfg @@ -139,6 +141,12 @@ func (gencfg *GeneralCfg) loadFromJsonCfg(jsnGeneralCfg *GeneralJsonCfg) (err er if jsnGeneralCfg.Max_parralel_conns != nil { gencfg.MaxParralelConns = *jsnGeneralCfg.Max_parralel_conns } + if jsnGeneralCfg.Concurrent_requests != nil { + gencfg.ConcurrentRequests = *jsnGeneralCfg.Concurrent_requests + } + if jsnGeneralCfg.Concurrent_strategy != nil { + gencfg.ConcurrentStrategy = *jsnGeneralCfg.Concurrent_strategy + } return nil } @@ -162,29 +170,31 @@ func (gencfg *GeneralCfg) AsMapInterface() map[string]interface{} { } return map[string]interface{}{ - utils.NodeIDCfg: gencfg.NodeID, - utils.LoggerCfg: gencfg.Logger, - utils.LogLevelCfg: gencfg.LogLevel, - utils.HttpSkipTlsVerifyCfg: gencfg.HttpSkipTlsVerify, - utils.RoundingDecimalsCfg: gencfg.RoundingDecimals, - utils.DBDataEncodingCfg: utils.Meta + gencfg.DBDataEncoding, - utils.TpExportPathCfg: gencfg.TpExportPath, - utils.PosterAttemptsCfg: gencfg.PosterAttempts, - utils.FailedPostsDirCfg: gencfg.FailedPostsDir, - utils.FailedPostsTTLCfg: failedPostsTTL, - utils.DefaultReqTypeCfg: gencfg.DefaultReqType, - utils.DefaultCategoryCfg: gencfg.DefaultCategory, - utils.DefaultTenantCfg: gencfg.DefaultTenant, - utils.DefaultTimezoneCfg: gencfg.DefaultTimezone, - utils.DefaultCachingCfg: gencfg.DefaultCaching, - utils.ConnectAttemptsCfg: gencfg.ConnectAttempts, - utils.ReconnectsCfg: gencfg.Reconnects, - utils.ConnectTimeoutCfg: connectTimeout, - utils.ReplyTimeoutCfg: replyTimeout, - utils.LockingTimeoutCfg: lockingTimeout, - utils.DigestSeparatorCfg: gencfg.DigestSeparator, - utils.DigestEqualCfg: gencfg.DigestEqual, - utils.RSRSepCfg: gencfg.RSRSep, - utils.MaxParralelConnsCfg: gencfg.MaxParralelConns, + utils.NodeIDCfg: gencfg.NodeID, + utils.LoggerCfg: gencfg.Logger, + utils.LogLevelCfg: gencfg.LogLevel, + utils.HttpSkipTlsVerifyCfg: gencfg.HttpSkipTlsVerify, + utils.RoundingDecimalsCfg: gencfg.RoundingDecimals, + utils.DBDataEncodingCfg: utils.Meta + gencfg.DBDataEncoding, + utils.TpExportPathCfg: gencfg.TpExportPath, + utils.PosterAttemptsCfg: gencfg.PosterAttempts, + utils.FailedPostsDirCfg: gencfg.FailedPostsDir, + utils.FailedPostsTTLCfg: failedPostsTTL, + utils.DefaultReqTypeCfg: gencfg.DefaultReqType, + utils.DefaultCategoryCfg: gencfg.DefaultCategory, + utils.DefaultTenantCfg: gencfg.DefaultTenant, + utils.DefaultTimezoneCfg: gencfg.DefaultTimezone, + utils.DefaultCachingCfg: gencfg.DefaultCaching, + utils.ConnectAttemptsCfg: gencfg.ConnectAttempts, + utils.ReconnectsCfg: gencfg.Reconnects, + utils.ConnectTimeoutCfg: connectTimeout, + utils.ReplyTimeoutCfg: replyTimeout, + utils.LockingTimeoutCfg: lockingTimeout, + utils.DigestSeparatorCfg: gencfg.DigestSeparator, + utils.DigestEqualCfg: gencfg.DigestEqual, + utils.RSRSepCfg: gencfg.RSRSep, + utils.MaxParralelConnsCfg: gencfg.MaxParralelConns, + utils.ConcurrentRequestsCfg: gencfg.ConcurrentRequests, + utils.ConcurrentStrategyCfg: gencfg.ConcurrentStrategy, } } diff --git a/config/generalcfg_test.go b/config/generalcfg_test.go index 427240f10..babd95564 100644 --- a/config/generalcfg_test.go +++ b/config/generalcfg_test.go @@ -60,28 +60,32 @@ func TestGeneralCfgloadFromJsonCfg(t *testing.T) { "locking_timeout": "0", // timeout internal locks to avoid deadlocks "digest_separator": ",", "digest_equal": ":", + "concurrent_requests": 0, + "concurrent_strategy": "", } }` expected = GeneralCfg{ - NodeID: "", - Logger: "*syslog", - LogLevel: 6, - HttpSkipTlsVerify: false, - RoundingDecimals: 5, - DBDataEncoding: "msgpack", - TpExportPath: "/var/spool/cgrates/tpe", - PosterAttempts: 3, - FailedPostsDir: "/var/spool/cgrates/failed_posts", - DefaultReqType: "*rated", - DefaultCategory: "call", - DefaultTenant: "cgrates.org", - DefaultTimezone: "Local", - ConnectAttempts: 3, - Reconnects: -1, - ConnectTimeout: time.Duration(1 * time.Second), - ReplyTimeout: time.Duration(2 * time.Second), - DigestSeparator: ",", - DigestEqual: ":", + NodeID: "", + Logger: "*syslog", + LogLevel: 6, + HttpSkipTlsVerify: false, + RoundingDecimals: 5, + DBDataEncoding: "msgpack", + TpExportPath: "/var/spool/cgrates/tpe", + PosterAttempts: 3, + FailedPostsDir: "/var/spool/cgrates/failed_posts", + DefaultReqType: "*rated", + DefaultCategory: "call", + DefaultTenant: "cgrates.org", + DefaultTimezone: "Local", + ConnectAttempts: 3, + Reconnects: -1, + ConnectTimeout: time.Duration(1 * time.Second), + ReplyTimeout: time.Duration(2 * time.Second), + DigestSeparator: ",", + DigestEqual: ":", + ConcurrentRequests: 0, + ConcurrentStrategy: utils.EmptyString, } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { t.Error(err) @@ -121,7 +125,9 @@ func TestGeneralCfgAsMapInterface(t *testing.T) { "digest_separator": ",", "digest_equal": ":", "rsr_separator": ";", - "max_parralel_conns": 100, + "max_parralel_conns": 100, + "concurrent_requests": 0, + "concurrent_strategy": "", }, }` eMap := map[string]interface{}{ @@ -149,6 +155,8 @@ func TestGeneralCfgAsMapInterface(t *testing.T) { "digest_equal": ":", "rsr_separator": ";", "max_parralel_conns": 100, + "concurrent_requests": 0, + "concurrent_strategy": "", } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { t.Error(err) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 3a1d0c27d..72c56eae2 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -44,6 +44,8 @@ type GeneralJsonCfg struct { Digest_equal *string Rsr_separator *string Max_parralel_conns *int + Concurrent_requests *int + Concurrent_strategy *string } // Listen config section diff --git a/data/conf/samples/conc_reqs_busy/cgrates.json b/data/conf/samples/conc_reqs_busy/cgrates.json new file mode 100644 index 000000000..ab3095175 --- /dev/null +++ b/data/conf/samples/conc_reqs_busy/cgrates.json @@ -0,0 +1,42 @@ +{ + + "general": { + "log_level": 7, + "node_id": "ConcurrentBusyEngine", + "reply_timeout": "50s", + "concurrent_requests": 2, + "concurrent_strategy": "*busy" + }, + + + "listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" + }, + + + "data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 6379, // data_db port to reach the database + "db_name": "10" // data_db database name to connect to + }, + + + "stor_db": { + "db_password": "CGRateS.org" + }, + + + "apiers": { + "enabled": true + }, + + + "sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:2014" + }, + + +}, \ No newline at end of file diff --git a/data/conf/samples/conc_reqs_queue/cgrates.json b/data/conf/samples/conc_reqs_queue/cgrates.json new file mode 100644 index 000000000..37eba4cce --- /dev/null +++ b/data/conf/samples/conc_reqs_queue/cgrates.json @@ -0,0 +1,41 @@ +{ + + "general": { + "log_level": 7, + "node_id": "ConcurrentQueueEngine", + "reply_timeout": "50s", + "concurrent_requests": 2, + "concurrent_strategy": "*queue", + }, + + + "listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", + }, + + + "data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 6379, // data_db port to reach the database + "db_name": "10", // data_db database name to connect to + }, + + "stor_db": { + "db_password": "CGRateS.org", + }, + + + "apiers": { + "enabled": true, + }, + + + "sessions": { + "enabled": true, + "listen_bijson": "127.0.0.1:2014" + }, + + +} \ No newline at end of file diff --git a/utils/concureqs.go b/utils/concureqs.go new file mode 100644 index 000000000..8d41f7843 --- /dev/null +++ b/utils/concureqs.go @@ -0,0 +1,69 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package utils + +import ( + "fmt" +) + +var ConReqs *ConcReqs + +type ConcReqs struct { + limit int + strategy string + aReqs chan struct{} +} + +func NewConReqs(reqs int, strategy string) *ConcReqs { + cR := &ConcReqs{ + limit: reqs, + strategy: strategy, + aReqs: make(chan struct{}, reqs), + } + for i := 0; i < reqs; i++ { + cR.aReqs <- struct{}{} + } + return cR +} + +var errDeny = fmt.Errorf("denying request due to maximum active requests reached") + +func (cR *ConcReqs) Allocate() (err error) { + if cR.limit == 0 { + return + } + switch cR.strategy { + case MetaBusy: + if len(cR.aReqs) == 0 { + return errDeny + } + fallthrough + case MetaQueue: + <-cR.aReqs // get from channel + } + return +} + +func (cR *ConcReqs) Deallocate() { + if cR.limit == 0 { + return + } + cR.aReqs <- struct{}{} + return +} diff --git a/utils/concureqs_gob_codec.go b/utils/concureqs_gob_codec.go new file mode 100644 index 000000000..16a3b4690 --- /dev/null +++ b/utils/concureqs_gob_codec.go @@ -0,0 +1,96 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +// Most of the logic follows standard library implementation in this file +package utils + +import ( + "bufio" + "encoding/gob" + "io" + "log" + "net/rpc" +) + +type concReqsGobServerCodec struct { + rwc io.ReadWriteCloser + dec *gob.Decoder + enc *gob.Encoder + encBuf *bufio.Writer + closed bool + allocated bool // populated if we have allocated a channel for concurrent requests +} + +func NewConcReqsGobServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + buf := bufio.NewWriter(conn) + return &concReqsGobServerCodec{ + rwc: conn, + dec: gob.NewDecoder(conn), + enc: gob.NewEncoder(buf), + encBuf: buf, + } +} + +func (c *concReqsGobServerCodec) ReadRequestHeader(r *rpc.Request) error { + return c.dec.Decode(r) +} + +func (c *concReqsGobServerCodec) ReadRequestBody(body interface{}) error { + if err := ConReqs.Allocate(); err != nil { + return err + } + c.allocated = true + return c.dec.Decode(body) +} + +func (c *concReqsGobServerCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) { + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } + if err = c.enc.Encode(r); err != nil { + if c.encBuf.Flush() == nil { + // Gob couldn't encode the header. Should not happen, so if it does, + // shut down the connection to signal that the connection is broken. + log.Println("rpc: gob error encoding response:", err) + c.Close() + } + return + } + if err = c.enc.Encode(body); err != nil { + if c.encBuf.Flush() == nil { + // Was a gob problem encoding the body but the header has been written. + // Shut down the connection to signal that the connection is broken. + log.Println("rpc: gob error encoding body:", err) + c.Close() + } + return + } + return c.encBuf.Flush() +} + +func (c *concReqsGobServerCodec) Close() error { + if c.closed { + // Only call c.rwc.Close once; otherwise the semantics are undefined. + return nil + } + c.closed = true + return c.rwc.Close() +} diff --git a/utils/concureqs_json_codec.go b/utils/concureqs_json_codec.go new file mode 100644 index 000000000..125d661ac --- /dev/null +++ b/utils/concureqs_json_codec.go @@ -0,0 +1,133 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +// Most of the logic follows standard library implementation in this file +package utils + +import ( + "encoding/json" + "errors" + "io" + "net/rpc" + "sync" +) + +type concReqsServerCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req serverRequest + + // JSON-RPC clients can use arbitrary json values as request IDs. + // Package rpc expects uint64 request IDs. + // We assign uint64 sequence numbers to incoming requests + // but save the original request ID in the pending map. + // When rpc responds, we use the sequence number in + // the response to find the original request ID. + mutex sync.Mutex // protects seq, pending + seq uint64 + pending map[uint64]*json.RawMessage + + allocated bool // populated if we have allocated a channel for concurrent requests +} + +// NewConcReqsServerCodec returns a new rpc.ServerCodec using JSON-RPC on conn. +func NewConcReqsServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { + return &concReqsServerCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]*json.RawMessage), + } +} + +func (c *concReqsServerCodec) ReadRequestHeader(r *rpc.Request) error { + c.req.reset() + if err := c.dec.Decode(&c.req); err != nil { + return err + } + r.ServiceMethod = c.req.Method + + // JSON request id can be any JSON value; + // RPC package expects uint64. Translate to + // internal uint64 and save JSON on the side. + c.mutex.Lock() + c.seq++ + c.pending[c.seq] = c.req.Id + c.req.Id = nil + r.Seq = c.seq + c.mutex.Unlock() + + return nil +} + +func (c *concReqsServerCodec) ReadRequestBody(x interface{}) error { + if err := ConReqs.Allocate(); err != nil { + return err + } + c.allocated = true + if x == nil { + return nil + } + if c.req.Params == nil { + return errMissingParams + } + // JSON params is array value. + // RPC params is struct. + // Unmarshal into array containing struct for now. + // Should think about making RPC more general. + var params [1]interface{} + params[0] = x + return json.Unmarshal(*c.req.Params, ¶ms) +} + +func (c *concReqsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { + if c.allocated { + defer func() { + ConReqs.Deallocate() + c.allocated = false + }() + } + + c.mutex.Lock() + b, ok := c.pending[r.Seq] + if !ok { + c.mutex.Unlock() + return errors.New("invalid sequence number in response") + } + delete(c.pending, r.Seq) + c.mutex.Unlock() + + if b == nil { + // Invalid request so no id. Use JSON null. + b = &null + } + resp := serverResponse{Id: b} + if r.Error == "" { + resp.Result = x + } else { + resp.Error = r.Error + } + return c.enc.Encode(resp) +} + +func (c *concReqsServerCodec) Close() error { + return c.c.Close() +} diff --git a/utils/consts.go b/utils/consts.go index ed6bcf81e..ee2b0b706 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -659,6 +659,8 @@ const ( MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" FileName = "FileName" + MetaBusy = "*busy" + MetaQueue = "*Queue" ) // Migrator Action @@ -1202,6 +1204,7 @@ const ( CoreSv1 = "CoreSv1" CoreSv1Status = "CoreSv1.Status" CoreSv1Ping = "CoreSv1.Ping" + CoreSv1Sleep = "CoreSv1.Sleep" ) // SupplierS APIs @@ -1306,6 +1309,7 @@ const ( SessionSv1ActivateSessions = "SessionSv1.ActivateSessions" SessionSv1DeactivateSessions = "SessionSv1.DeactivateSessions" SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" + SessionSv1Sleep = "SessionSv1.Sleep" ) // Responder APIs @@ -1562,30 +1566,32 @@ var ( // GeneralCfg const ( - NodeIDCfg = "node_id" - LoggerCfg = "logger" - LogLevelCfg = "log_level" - HttpSkipTlsVerifyCfg = "http_skip_tls_verify" - RoundingDecimalsCfg = "rounding_decimals" - DBDataEncodingCfg = "dbdata_encoding" - TpExportPathCfg = "tpexport_dir" - PosterAttemptsCfg = "poster_attempts" - FailedPostsDirCfg = "failed_posts_dir" - FailedPostsTTLCfg = "failed_posts_ttl" - DefaultReqTypeCfg = "default_request_type" - DefaultCategoryCfg = "default_category" - DefaultTenantCfg = "default_tenant" - DefaultTimezoneCfg = "default_timezone" - DefaultCachingCfg = "default_caching" - ConnectAttemptsCfg = "connect_attempts" - ReconnectsCfg = "reconnects" - ConnectTimeoutCfg = "connect_timeout" - ReplyTimeoutCfg = "reply_timeout" - LockingTimeoutCfg = "locking_timeout" - DigestSeparatorCfg = "digest_separator" - DigestEqualCfg = "digest_equal" - RSRSepCfg = "rsr_separator" - MaxParralelConnsCfg = "max_parralel_conns" + NodeIDCfg = "node_id" + LoggerCfg = "logger" + LogLevelCfg = "log_level" + HttpSkipTlsVerifyCfg = "http_skip_tls_verify" + RoundingDecimalsCfg = "rounding_decimals" + DBDataEncodingCfg = "dbdata_encoding" + TpExportPathCfg = "tpexport_dir" + PosterAttemptsCfg = "poster_attempts" + FailedPostsDirCfg = "failed_posts_dir" + FailedPostsTTLCfg = "failed_posts_ttl" + DefaultReqTypeCfg = "default_request_type" + DefaultCategoryCfg = "default_category" + DefaultTenantCfg = "default_tenant" + DefaultTimezoneCfg = "default_timezone" + DefaultCachingCfg = "default_caching" + ConnectAttemptsCfg = "connect_attempts" + ReconnectsCfg = "reconnects" + ConnectTimeoutCfg = "connect_timeout" + ReplyTimeoutCfg = "reply_timeout" + LockingTimeoutCfg = "locking_timeout" + DigestSeparatorCfg = "digest_separator" + DigestEqualCfg = "digest_equal" + RSRSepCfg = "rsr_separator" + MaxParralelConnsCfg = "max_parralel_conns" + ConcurrentRequestsCfg = "concurrent_requests" + ConcurrentStrategyCfg = "concurrent_strategy" ) // StorDbCfg diff --git a/utils/server.go b/utils/server.go index 764471db8..cfd6bed26 100644 --- a/utils/server.go +++ b/utils/server.go @@ -178,7 +178,7 @@ func (s *Server) ServeJSON(addr string, exitChan chan bool) { if s.isDispatched { go rpc.ServeCodec(NewCustomJSONServerCodec(conn)) } else { - go jsonrpc.ServeConn(conn) + go rpc.ServeCodec(NewConcReqsServerCodec(conn)) } } @@ -217,8 +217,7 @@ func (s *Server) ServeGOB(addr string, exitChan chan bool) { continue } - //utils.Logger.Info(fmt.Sprintf(" New incoming connection: %v", conn.RemoteAddr())) - go rpc.ServeConn(conn) + go rpc.ServeCodec(NewConcReqsGobServerCodec(conn)) } } @@ -281,7 +280,7 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, if s.isDispatched { rpc.ServeCodec(NewCustomJSONServerCodec(ws)) } else { - jsonrpc.ServeConn(ws) + rpc.ServeCodec(NewConcReqsServerCodec(ws)) } }) if useBasicAuth { @@ -378,7 +377,7 @@ func (r *rpcRequest) Close() error { // Call invokes the RPC request, waits for it to complete, and returns the results. func (r *rpcRequest) Call() io.Reader { - go jsonrpc.ServeConn(r) + go rpc.ServeCodec(NewConcReqsServerCodec(r)) <-r.done return r.rw }