From b6dde967f2042baba04710d790ba4563fd5619a8 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 24 Jan 2014 17:35:57 +0200 Subject: [PATCH 1/9] samll engine fix --- cmd/cgr-engine/cgr-engine.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ea60d3b31..7f2667b66 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -58,7 +58,7 @@ var ( cfgPath = flag.String("config", "/etc/cgrates/cgrates.cfg", "Configuration file location.") version = flag.Bool("version", false, "Prints the application version.") raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config") - schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config") + schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon .overwriting config") cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config") cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config") mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config") @@ -168,7 +168,7 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) { } func startHistoryAgent(scribeServer history.Scribe) { - if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here + if cfg.HistoryAgentEnabled && cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here engine.Logger.Info("Starting History Agent.") for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) @@ -367,7 +367,6 @@ func main() { } server.RpcRegisterName("Scribe", scribeServer) } - go startHistoryAgent(scribeServer) go server.ServeGOB(cfg.RPCGOBListen) go server.ServeJSON(cfg.RPCJSONListen) From 5b424d0e7051e9bc42a8c72e27ba5e17c510f488 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 25 Jan 2014 10:46:21 +0100 Subject: [PATCH 2/9] Engine components sync via chans --- cmd/cgr-engine/cgr-engine.go | 137 +++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 45 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7f2667b66..e5d56743f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -66,13 +66,28 @@ var ( bal = balancer2go.NewBalancer() exitChan = make(chan bool) server = &engine.Server{} + scribeServer history.Scribe sm sessionmanager.SessionManager medi *mediator.Mediator cfg *config.CGRConfig err error ) -func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) { +func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) { + if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) + exitChan <- true + return + } + if err := accountDb.CacheAccounting(nil); err != nil { + engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) + exitChan <- true + return + } + close(doneChan) +} + +func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, chanDone chan struct{}) { var connector engine.Connector if cfg.MediatorRater == INTERNAL { connector = responder @@ -90,6 +105,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not connect to engine: %v", err)) exitChan <- true + return } connector = &engine.RPCClientConnector{Client: client} } @@ -98,7 +114,9 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD if err != nil { engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err)) exitChan <- true + return } + close(chanDone) } func startCdrc() { @@ -150,26 +168,36 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage exitChan <- true } -func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) { +func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) { if cfg.CDRSMediator == INTERNAL { - for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable - time.Sleep(time.Duration(i+1) * time.Second) - if medi != nil { // Got our mediator, no need to wait any longer - break - } - } + <-mediChan // Deadlock if mediator not started if medi == nil { engine.Logger.Crit(" Could not connect to mediator, exiting.") exitChan <- true + return } } cs := cdrs.New(cdrDb, medi, cfg) cs.RegisterHanlersToServer(server) + close(doneChan) } -func startHistoryAgent(scribeServer history.Scribe) { - if cfg.HistoryAgentEnabled && cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here - engine.Logger.Info("Starting History Agent.") +func startHistoryServer(chanDone chan struct{}) { + if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + exitChan <- true + return + } + server.RpcRegisterName("Scribe", scribeServer) + close(chanDone) +} + +// chanStartServer will report when server is up, useful for internal requests +func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) { + if cfg.HistoryServer == INTERNAL { // For internal requests, wait for server to come online before connecting + engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) + <-chanServerStarted // If server is not enabled, will have deadlock here + } else { // Connect in iteration since there are chances of concurrency here for i := 0; i < 3; i++ { //ToDo: Make it globally configurable //engine.Logger.Crit(fmt.Sprintf(" Trying to connect, iteration: %d, time %s", i, time.Now())) if scribeServer, err = history.NewProxyScribe(cfg.HistoryServer); err == nil { @@ -179,13 +207,32 @@ func startHistoryAgent(scribeServer history.Scribe) { exitChan <- true return } - time.Sleep(time.Duration(i+1) * time.Second) + time.Sleep(time.Duration(i) * time.Second) } } engine.SetHistoryScribe(scribeServer) return } +// Starts the rpc server, waiting for the necessary components to finish their tasks +func serveRpc(rpcWaitChans []chan struct{}) { + for _, chn := range rpcWaitChans { + <-chn + } + // Each of the serve block so need to start in their own goroutine + go server.ServeJSON(cfg.RPCJSONListen) + go server.ServeGOB(cfg.RPCGOBListen) + +} + +// Starts the http server, waiting for the necessary components to finish their tasks +func serveHttp(httpWaitChans []chan struct{}) { + for _, chn := range httpWaitChans { + <-chn + } + server.ServeHTTP(cfg.HTTPListen) +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" { engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!") @@ -301,18 +348,17 @@ func main() { stopHandled := false + // Async starts here + + rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed + httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed + if cfg.RaterEnabled { // Cache rating if rater enabled - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) - return - } - if err := accountDb.CacheAccounting(nil); err != nil { - engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) - return - } + cacheChan := make(chan struct{}) + rpcWait = append(rpcWait, cacheChan) + go cacheData(ratingDb, accountDb, cacheChan) } - // Async starts here if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled { go registerToBalancer() go stopRaterSignalHandler() @@ -329,7 +375,7 @@ func main() { } if cfg.BalancerEnabled { - engine.Logger.Info("Registering CGRateS Balancer service") + engine.Logger.Info("Registering CGRateS Balancer service.") go stopBalancerSignalHandler() stopHandled = true responder.Bal = bal @@ -356,47 +402,48 @@ func main() { }() } - var scribeServer history.Scribe - + var histServChan chan struct{} // Will be initialized only if the server starts if cfg.HistoryServerEnabled { - engine.Logger.Info("Registering CGRates History service") - if scribeServer, err = history.NewFileScribe(cfg.HistoryDir, cfg.HistorySaveInterval); err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - return - } - server.RpcRegisterName("Scribe", scribeServer) + histServChan = make(chan struct{}) + rpcWait = append(rpcWait, histServChan) + go startHistoryServer(histServChan) } - go server.ServeGOB(cfg.RPCGOBListen) - go server.ServeJSON(cfg.RPCJSONListen) + if cfg.HistoryAgentEnabled { + engine.Logger.Info("Starting CGRateS History Agent.") + go startHistoryAgent(scribeServer, histServChan) + } - go startHistoryAgent(scribeServer) + var medChan chan struct{} + if cfg.MediatorEnabled { + engine.Logger.Info("Starting CGRateS Mediator service.") + medChan = make(chan struct{}) + go startMediator(responder, logDb, cdrDb, medChan) + } if cfg.CDRSEnabled { - engine.Logger.Info("Registering CGRateS CDR service") - go startCDRS(responder, cdrDb) - } - - go server.ServeHTTP(cfg.HTTPListen) - - if cfg.MediatorEnabled { - engine.Logger.Info("Starting CGRateS Mediator service") - go startMediator(responder, logDb, cdrDb) + engine.Logger.Info("Starting CGRateS CDRS service.") + cdrsChan := make(chan struct{}) + httpWait = append(httpWait, cdrsChan) + go startCDRS(responder, cdrDb, medChan, cdrsChan) } if cfg.SMEnabled { - engine.Logger.Info("Starting CGRateS SessionManager service") + engine.Logger.Info("Starting CGRateS SessionManager service.") go startSessionManager(responder, logDb) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler() } if cfg.CdrcEnabled { - engine.Logger.Info("Starting CGRateS CDR client") + engine.Logger.Info("Starting CGRateS CDR client.") go startCdrc() } + // Start the servers + go serveRpc(rpcWait) + go serveHttp(httpWait) + <-exitChan if *pidFile != "" { if err := os.Remove(*pidFile); err != nil { From 9210b20924d975827b3c120e8d5523cc649fa587 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 25 Jan 2014 10:49:00 +0100 Subject: [PATCH 3/9] Fmt on sources :( --- apier/v1/accounts.go | 25 ++++----- apier/v1/apier.go | 8 +-- apier/v1/apier_local_test.go | 90 ++++++++++++++---------------- apier/v1/tpactiontimings.go | 4 +- apier/v1/tutfscsv_local_test.go | 11 ++-- cdrc/cdrc_test.go | 2 +- cmd/cgr-engine/cgr-engine.go | 2 +- config/config.go | 8 +-- engine/action.go | 21 ++++--- engine/action_timing.go | 10 ++-- engine/actions_test.go | 10 ++-- engine/storage_sql.go | 2 +- engine/tpimporter_csv.go | 2 +- sessionmanager/fssessionmanager.go | 3 +- utils/apitpdata.go | 8 +-- utils/coreutils.go | 2 +- 16 files changed, 96 insertions(+), 112 deletions(-) diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 49efdb106..c96b7f653 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -21,8 +21,8 @@ package apier import ( "errors" "fmt" - "github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" "time" ) @@ -33,10 +33,10 @@ type AttrAcntAction struct { } type AccountActionTiming struct { - Id string // The id to reference this particular ActionTiming + Id string // The id to reference this particular ActionTiming ActionPlanId string // The id of the ActionPlanId profile attached to the account - ActionsId string // The id of actions which will be executed - NextExecTime time.Time // Next execution time + ActionsId string // The id of actions which will be executed + NextExecTime time.Time // Next execution time } func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*AccountActionTiming) error { @@ -60,12 +60,12 @@ func (self *ApierV1) GetAccountActionPlan(attrs AttrAcntAction, reply *[]*Accoun } type AttrRemActionTiming struct { - ActionPlanId string // Id identifying the ActionTimings profile + ActionPlanId string // Id identifying the ActionTimings profile ActionTimingId string // Internal CGR id identifying particular ActionTiming, *all for all user related ActionTimings to be canceled Tenant string // Tenant he account belongs to Account string // Account name Direction string // Traffic direction - ReloadScheduler bool // If set it will reload the scheduler after adding + ReloadScheduler bool // If set it will reload the scheduler after adding } // Removes an ActionTimings or parts of it depending on filters being set @@ -155,12 +155,11 @@ func (self *ApierV1) RemAccountActionTriggers(attrs AttrRemAcntActionTriggers, r return nil } - type AttrSetAccount struct { - Tenant string - Direction string - Account string - Type string // <*prepaid|*postpaid> + Tenant string + Direction string + Account string + Type string // <*prepaid|*postpaid> ActionPlanId string } @@ -186,8 +185,8 @@ func (self *ApierV1) SetAccount(attr AttrSetAccount, reply *string) error { Type: attr.Type, } } - - if len(attr.ActionPlanId) != 0 { + + if len(attr.ActionPlanId) != 0 { var err error ats, err = self.AccountDb.GetActionTimings(attr.ActionPlanId) if err != nil { diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 09593b9da..d045ffe25 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -120,7 +120,7 @@ func (self *ApierV1) AddBalance(attr *AttrAddBalance, reply *string) error { if attr.Overwrite { aType = engine.TOPUP_RESET } - at.SetActions(engine.Actions{&engine.Action{ActionType: aType, BalanceId: attr.BalanceId, Direction: attr.Direction, + at.SetActions(engine.Actions{&engine.Action{ActionType: aType, BalanceId: attr.BalanceId, Direction: attr.Direction, Balance: &engine.Balance{Value: attr.Value, Weight: attr.Weight}}}) if err := at.Execute(); err != nil { *reply = err.Error() @@ -288,8 +288,8 @@ func (self *ApierV1) SetActions(attrs AttrSetActions, reply *string) error { } type AttrSetActionPlan struct { - Id string // Profile id - ActionPlan []*ApiActionTiming // Set of actions this Actions profile will perform + Id string // Profile id + ActionPlan []*ApiActionTiming // Set of actions this Actions profile will perform Overwrite bool // If previously defined, will be overwritten ReloadScheduler bool // Enables automatic reload of the scheduler (eg: useful when adding a single action timing) } @@ -444,7 +444,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { self.Sched.Restart() *reply = OK return nil - + } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 6e086049e..9d96d3150 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -24,15 +24,15 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "net/http" "net/rpc" + "net/url" "os/exec" "path" "reflect" + "strings" "testing" "time" - "net/http" - "net/url" - "strings" ) // ToDo: Replace rpc.Client with internal rpc server and Apier using internal map as both data and stor so we can run the tests non-local @@ -929,7 +929,6 @@ func TestApierAddBalance(t *testing.T) { t.Errorf("Calling ApierV1.AddBalance received: %s", reply) } - } // Test here ExecuteAction @@ -1013,15 +1012,13 @@ func TestApierAddTriggeredAction(t *testing.T) { } } - - // Test here GetAccountActionTriggers func TestApierGetAccountActionTriggers(t *testing.T) { if !*testLocal { return } var reply engine.ActionTriggerPriotityList - req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan2", Direction: "*out"} + req := AttrAcntAction{Tenant: "cgrates.org", Account: "dan2", Direction: "*out"} if err := rater.Call("ApierV1.GetAccountActionTriggers", req, &reply); err != nil { t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) } else if len(reply) != 1 || reply[0].ActionsId != "WARN_VIA_HTTP" { @@ -1029,7 +1026,6 @@ func TestApierGetAccountActionTriggers(t *testing.T) { } } - // Test here RemAccountActionTriggers func TestApierRemAccountActionTriggers(t *testing.T) { if !*testLocal { @@ -1037,14 +1033,14 @@ func TestApierRemAccountActionTriggers(t *testing.T) { } // Test first get so we can steal the id which we need to remove var reply engine.ActionTriggerPriotityList - req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan2", Direction: "*out"} + req := AttrAcntAction{Tenant: "cgrates.org", Account: "dan2", Direction: "*out"} if err := rater.Call("ApierV1.GetAccountActionTriggers", req, &reply); err != nil { t.Error("Got error on ApierV1.GetAccountActionTimings: ", err.Error()) } else if len(reply) != 1 || reply[0].ActionsId != "WARN_VIA_HTTP" { t.Errorf("Unexpected action triggers received %v", reply) } var rmReply string - rmReq := AttrRemAcntActionTriggers{Tenant: "cgrates.org", Account:"dan2", Direction: "*out", ActionTriggerId: reply[0].Id} + rmReq := AttrRemAcntActionTriggers{Tenant: "cgrates.org", Account: "dan2", Direction: "*out", ActionTriggerId: reply[0].Id} if err := rater.Call("ApierV1.RemAccountActionTriggers", rmReq, &rmReply); err != nil { t.Error("Got error on ApierV1.RemActionTiming: ", err.Error()) } else if rmReply != OK { @@ -1057,37 +1053,35 @@ func TestApierRemAccountActionTriggers(t *testing.T) { } } - // Test here SetAccount func TestApierSetAccount(t *testing.T) { - if !*testLocal { - return - } - reply := "" - attrs := &AttrSetAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan7", Type: "*prepaid", ActionPlanId: "ATMS_1"} - if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { - t.Error("Got error on ApierV1.SetAccount: ", err.Error()) - } else if reply != "OK" { - t.Errorf("Calling ApierV1.SetAccount received: %s", reply) - } - reply2 := "" - attrs2 := new(AttrSetAccount) - *attrs2 = *attrs - attrs2.ActionPlanId = "DUMMY_DATA" // Does not exist so it should error when adding triggers on it - // Add account with actions timing which does not exist - if err := rater.Call("ApierV1.SetAccount", attrs2, &reply2); err == nil || reply2 == "OK" { // OK is not welcomed - t.Error("Expecting error on ApierV1.SetAccount.", err, reply2) - } + if !*testLocal { + return + } + reply := "" + attrs := &AttrSetAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan7", Type: "*prepaid", ActionPlanId: "ATMS_1"} + if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { + t.Error("Got error on ApierV1.SetAccount: ", err.Error()) + } else if reply != "OK" { + t.Errorf("Calling ApierV1.SetAccount received: %s", reply) + } + reply2 := "" + attrs2 := new(AttrSetAccount) + *attrs2 = *attrs + attrs2.ActionPlanId = "DUMMY_DATA" // Does not exist so it should error when adding triggers on it + // Add account with actions timing which does not exist + if err := rater.Call("ApierV1.SetAccount", attrs2, &reply2); err == nil || reply2 == "OK" { // OK is not welcomed + t.Error("Expecting error on ApierV1.SetAccount.", err, reply2) + } } - // Test here GetAccountActionTimings func TestApierGetAccountActionPlan(t *testing.T) { if !*testLocal { return } var reply []*AccountActionTiming - req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan7", Direction: "*out"} + req := AttrAcntAction{Tenant: "cgrates.org", Account: "dan7", Direction: "*out"} if err := rater.Call("ApierV1.GetAccountActionPlan", req, &reply); err != nil { t.Error("Got error on ApierV1.GetAccountActionPlan: ", err.Error()) } else if len(reply) != 1 { @@ -1099,21 +1093,20 @@ func TestApierGetAccountActionPlan(t *testing.T) { } } - // Test here RemActionTiming func TestApierRemActionTiming(t *testing.T) { if !*testLocal { return } var rmReply string - rmReq := AttrRemActionTiming{ActionPlanId: "ATMS_1", Tenant: "cgrates.org", Account:"dan4", Direction: "*out"} + rmReq := AttrRemActionTiming{ActionPlanId: "ATMS_1", Tenant: "cgrates.org", Account: "dan4", Direction: "*out"} if err := rater.Call("ApierV1.RemActionTiming", rmReq, &rmReply); err != nil { t.Error("Got error on ApierV1.RemActionTiming: ", err.Error()) } else if rmReply != OK { t.Error("Unexpected answer received", rmReply) } var reply []*AccountActionTiming - req := AttrAcntAction{Tenant: "cgrates.org", Account:"dan4", Direction: "*out"} + req := AttrAcntAction{Tenant: "cgrates.org", Account: "dan4", Direction: "*out"} if err := rater.Call("ApierV1.GetAccountActionPlan", req, &reply); err != nil { t.Error("Got error on ApierV1.GetAccountActionPlan: ", err.Error()) } else if len(reply) != 0 { @@ -1166,12 +1159,12 @@ func TestTriggersExecute(t *testing.T) { return } reply := "" - attrs := &AttrSetAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan8", Type: "*prepaid"} - if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { - t.Error("Got error on ApierV1.SetAccount: ", err.Error()) - } else if reply != "OK" { - t.Errorf("Calling ApierV1.SetAccount received: %s", reply) - } + attrs := &AttrSetAccount{Tenant: "cgrates.org", Direction: "*out", Account: "dan8", Type: "*prepaid"} + if err := rater.Call("ApierV1.SetAccount", attrs, &reply); err != nil { + t.Error("Got error on ApierV1.SetAccount: ", err.Error()) + } else if reply != "OK" { + t.Errorf("Calling ApierV1.SetAccount received: %s", reply) + } attrAddBlnc := &AttrAddBalance{Tenant: "cgrates.org", Account: "1008", BalanceId: "*monetary", Direction: "*out", Value: 2} if err := rater.Call("ApierV1.AddBalance", attrAddBlnc, &reply); err != nil { t.Error("Got error on ApierV1.AddBalance: ", err.Error()) @@ -1224,14 +1217,14 @@ func TestResponderGetCost(t *testing.T) { func TestCdrServer(t *testing.T) { if !*testLocal { - return - } + return + } httpClient := new(http.Client) - cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + cdrForm1 := url.Values{"accid": []string{"dsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} - cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, - "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, + cdrForm2 := url.Values{"accid": []string{"adsafdsaf"}, "cdrhost": []string{"192.168.1.1"}, "reqtype": []string{"rated"}, "direction": []string{"*out"}, + "tenant": []string{"cgrates.org"}, "tor": []string{"call"}, "account": []string{"1001"}, "subject": []string{"1001"}, "destination": []string{"1002"}, "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) @@ -1243,8 +1236,8 @@ func TestCdrServer(t *testing.T) { func TestExportCdrsToFile(t *testing.T) { if !*testLocal { - return - } + return + } var reply *utils.ExportedFileCdrs req := utils.AttrExpFileCdrs{} if err := rater.Call("ApierV1.ExportCdrsToFile", req, &reply); err == nil || !strings.HasPrefix(err.Error(), utils.ERR_MANDATORY_IE_MISSING) { @@ -1273,7 +1266,6 @@ func TestExportCdrsToFile(t *testing.T) { */ } - // Simply kill the engine after we are done with tests within this file func TestStopEngine(t *testing.T) { if !*testLocal { diff --git a/apier/v1/tpactiontimings.go b/apier/v1/tpactiontimings.go index ec9210fb9..ca5c94807 100644 --- a/apier/v1/tpactiontimings.go +++ b/apier/v1/tpactiontimings.go @@ -43,8 +43,8 @@ func (self *ApierV1) SetTPActionPlan(attrs utils.TPActionPlan, reply *string) er } type AttrGetTPActionPlan struct { - TPid string // Tariff plan id - Id string // ActionTimings id + TPid string // Tariff plan id + Id string // ActionTimings id } // Queries specific ActionPlan profile on tariff plan diff --git a/apier/v1/tutfscsv_local_test.go b/apier/v1/tutfscsv_local_test.go index 612ef82d3..ddf976c67 100644 --- a/apier/v1/tutfscsv_local_test.go +++ b/apier/v1/tutfscsv_local_test.go @@ -19,18 +19,17 @@ along with this program. If not, see package apier import ( - "testing" "fmt" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "net/rpc/jsonrpc" "os/exec" "path" - "time" "reflect" + "testing" + "time" ) - // Empty tables before using them func TestFsCsvCreateTables(t *testing.T) { if !*testLocal { @@ -107,7 +106,6 @@ func TestFsCsvRpcConn(t *testing.T) { } } - // Make sure we start with fresh data func TestFsCsvEmptyCache(t *testing.T) { if !*testLocal { @@ -159,7 +157,7 @@ func TestFsCsvCall1(t *testing.T) { } tStart := time.Date(2014, 01, 15, 6, 0, 0, 0, time.UTC) tEnd := time.Date(2014, 01, 15, 6, 0, 35, 0, time.UTC) - cd := engine.CallDescriptor { + cd := engine.CallDescriptor{ Direction: "*out", TOR: "call", Tenant: "cgrates.org", @@ -168,7 +166,7 @@ func TestFsCsvCall1(t *testing.T) { Destination: "1002", TimeStart: tStart, TimeEnd: tEnd, - CallDuration: 35, + CallDuration: 35, } var cc engine.CallCost // Simple test that command is executed without errors @@ -179,7 +177,6 @@ func TestFsCsvCall1(t *testing.T) { } } - // Simply kill the engine after we are done with tests within this file func TestFsCsvStopEngine(t *testing.T) { if !*testLocal { diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 87f72cf4e..2ed6a9271 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -50,7 +50,7 @@ func TestParseFieldsConfig(t *testing.T) { cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"} cdrc = &Cdrc{cgrCfg: cgrConfig} if err := cdrc.parseFieldsConfig(); err != nil { - t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields) + t.Errorf("Failed to corectly parse extra fields %v", cdrc.cfgCdrFields) } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index e5d56743f..5e03a6fa0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -219,7 +219,7 @@ func serveRpc(rpcWaitChans []chan struct{}) { for _, chn := range rpcWaitChans { <-chn } - // Each of the serve block so need to start in their own goroutine + // Each of the serve blocks so need to start in their own goroutine go server.ServeJSON(cfg.RPCJSONListen) go server.ServeGOB(cfg.RPCGOBListen) diff --git a/config/config.go b/config/config.go index bdfa88b82..8e6779467 100644 --- a/config/config.go +++ b/config/config.go @@ -137,10 +137,10 @@ type CGRConfig struct { HistoryServerEnabled bool // Starts History as server: . HistoryDir string // Location on disk where to store history files. HistorySaveInterval time.Duration // The timout duration between history writes - MailerServer string // The server to use when sending emails out - MailerAuthUser string // Authenticate to email server using this user - MailerAuthPass string // Authenticate to email server with this password - MailerFromAddr string // From address used when sending emails out + MailerServer string // The server to use when sending emails out + MailerAuthUser string // Authenticate to email server using this user + MailerAuthPass string // Authenticate to email server with this password + MailerFromAddr string // From address used when sending emails out } func (self *CGRConfig) setDefaults() error { diff --git a/engine/action.go b/engine/action.go index dd93249d6..9360ba7a9 100644 --- a/engine/action.go +++ b/engine/action.go @@ -21,15 +21,15 @@ package engine import ( "bytes" "encoding/json" - "fmt" "errors" + "fmt" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "net/http" "net/smtp" "sort" - "time" "strings" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/utils" + "time" ) /* @@ -101,7 +101,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { } func logAction(ub *UserBalance, a *Action) (err error) { - ubMarshal,_ := json.Marshal(ub) + ubMarshal, _ := json.Marshal(ub) Logger.Info(fmt.Sprintf("Threshold reached, balance: %s", ubMarshal)) return } @@ -214,7 +214,7 @@ func callUrlAsync(ub *UserBalance, a *Action) error { } time.Sleep(time.Duration(i) * time.Minute) } - + }() return nil } @@ -238,8 +238,8 @@ func mailAsync(ub *UserBalance, a *Action) error { } toAddrStr += addr } - message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson)) - auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer,":")[0]) // We only need host part, so ignore port + message := []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), ubJson)) + auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port go func() { for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort if err := smtp.SendMail(cgrCfg.MailerServer, auth, cgrCfg.MailerFromAddr, toAddrs, message); err == nil { @@ -250,11 +250,10 @@ func mailAsync(ub *UserBalance, a *Action) error { } time.Sleep(time.Duration(i) * time.Minute) } - }() + }() return nil } - - + // Structure to store actions according to weight type Actions []*Action diff --git a/engine/action_timing.go b/engine/action_timing.go index 5df8db1bb..b5d362ea1 100644 --- a/engine/action_timing.go +++ b/engine/action_timing.go @@ -20,11 +20,11 @@ package engine import ( "fmt" + "github.com/cgrates/cgrates/utils" "sort" "strconv" "strings" "time" - "github.com/cgrates/cgrates/utils" ) const ( @@ -299,12 +299,12 @@ func (at *ActionTiming) String_DISABLED() string { // Helper to remove ActionTiming members based on specific filters, empty data means no always match func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPlan { for idx, at := range ats { - if len(actionTimingId)!=0 && at.Id!=actionTimingId { // No Match for ActionTimingId, no need to move further + if len(actionTimingId) != 0 && at.Id != actionTimingId { // No Match for ActionTimingId, no need to move further continue } if len(balanceId) == 0 { // No account defined, considered match for complete removal if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionTiming,0) + return make([]*ActionTiming, 0) } ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] continue @@ -313,9 +313,9 @@ func RemActionTiming(ats ActionPlan, actionTimingId, balanceId string) ActionPla if blncId == balanceId { if len(at.UserBalanceIds) == 1 { // Only one balance, remove complete at if len(ats) == 1 { // Removing last item, by init empty - return make([]*ActionTiming,0) + return make([]*ActionTiming, 0) } - ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] + ats[idx], ats = ats[len(ats)-1], ats[:len(ats)-1] } else { at.UserBalanceIds[iBlnc], at.UserBalanceIds = at.UserBalanceIds[len(at.UserBalanceIds)-1], at.UserBalanceIds[:len(at.UserBalanceIds)-1] } diff --git a/engine/actions_test.go b/engine/actions_test.go index 57c8bc9b3..1a8dcad2b 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -427,21 +427,21 @@ func TestActionTimingsRemoveMember(t *testing.T) { Tag: "test", UserBalanceIds: []string{"one", "two", "three"}, ActionsId: "TEST_ACTIONS", - } + } at2 := &ActionTiming{ Id: "some uuid22", Tag: "test2", UserBalanceIds: []string{"three", "four"}, ActionsId: "TEST_ACTIONS2", - } + } ats := ActionPlan{at1, at2} - if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 { + if outAts := RemActionTiming(ats, "", "four"); len(outAts[1].UserBalanceIds) != 1 { t.Error("Expecting fewer balance ids", outAts[1].UserBalanceIds) } if ats = RemActionTiming(ats, "", "three"); len(ats) != 1 { t.Error("Expecting fewer actionTimings", ats) } - if ats = RemActionTiming(ats, "some_uuid22", "");len(ats) != 1 { + if ats = RemActionTiming(ats, "some_uuid22", ""); len(ats) != 1 { t.Error("Expecting fewer actionTimings members", ats) } ats2 := ActionPlan{at1, at2} @@ -450,8 +450,6 @@ func TestActionTimingsRemoveMember(t *testing.T) { } } - - func TestActionTriggerMatchNil(t *testing.T) { at := &ActionTrigger{ Direction: OUTBOUND, diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 7fcd09aee..1e7f4ae9a 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -1119,7 +1119,7 @@ func (self *SQLStorage) GetTpAccountActions(aaFltr *utils.TPAccountActions) (map Tenant: tenant, Account: account, Direction: direction, - ActionPlanId: action_timings_tag, + ActionPlanId: action_timings_tag, ActionTriggersId: action_triggers_tag, } aa[aacts.KeyId()] = aacts diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 5fc8bb9a5..48499b987 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -47,7 +47,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{ utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans, utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, utils.ACTIONS_CSV: (*TPCSVImporter).importActions, - utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, + utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e80b39b8d..cf1e81043 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -133,7 +133,6 @@ func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration return nil } - // Sends the transfer command to unpark the call to freeswitch func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) @@ -175,7 +174,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { Destination: ev.GetDestination(), TimeStart: startTime, TimeEnd: startTime.Add(cfg.SMMaxCallDuration), - } + } var remainingDurationFloat float64 err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) if err != nil { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index b1e0f58b1..bc9475266 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -221,9 +221,9 @@ type TPAction struct { } type TPActionPlan struct { - TPid string // Tariff plan id - Id string // ActionPlan id - ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group + TPid string // Tariff plan id + Id string // ActionPlan id + ActionPlan []*TPActionTiming // Set of ActionTiming bindings this profile will group } type TPActionTiming struct { @@ -266,7 +266,7 @@ type TPAccountActions struct { Tenant string // Tenant's Id Account string // Account name Direction string // Traffic direction - ActionPlanId string // Id of ActionPlan profile to use + ActionPlanId string // Id of ActionPlan profile to use ActionTriggersId string // Id of ActionTriggers profile to use } diff --git a/utils/coreutils.go b/utils/coreutils.go index 7b49a8e0d..5d9023f6d 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -184,6 +184,6 @@ func ParseDurationWithSecs(durStr string) (time.Duration, error) { return time.ParseDuration(durStr) } -func BalanceKey(tenant, account, direction string ) string { +func BalanceKey(tenant, account, direction string) string { return fmt.Sprintf("%s:%s:%s", direction, tenant, account) } From 8177e64f8bfb5a962986954602c86104549888e9 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 24 Jan 2014 19:15:49 +0200 Subject: [PATCH 4/9] new histtory tests --- cmd/cgr-loader/cgr-loader.go | 6 +- engine/calldesc.go | 3 + engine/destinations.go | 5 ++ engine/history_test.go | 30 +++++---- engine/ratingplan.go | 5 ++ engine/ratingprofile.go | 5 ++ engine/storage_map.go | 11 ++- engine/storage_mongo.go | 11 ++- engine/storage_redis.go | 7 +- history/file_scribe.go | 126 +++++++++-------------------------- history/mock_scribe.go | 81 +++++----------------- history/proxy_scribe.go | 19 ++++-- history/scribe.go | 70 ++++++++++++++----- history/scribe_test.go | 36 +++++----- 14 files changed, 179 insertions(+), 236 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index c835e9233..6060e552a 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -67,8 +67,8 @@ var ( stats = flag.Bool("stats", false, "Generates statsistics about given data.") fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") - historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving") - raterAddress = flag.String("rater_address", cgrConfig.MediatorRater, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") + historyServer = flag.String("history_server", cgrConfig.RPCGOBListen, "The history server address:port, empty to disable automaticautomatic history archiving") + raterAddress = flag.String("rater_address", cgrConfig.RPCGOBListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -157,8 +157,8 @@ func main() { log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) return } else { + gob.Register(engine.Destination{}) engine.SetHistoryScribe(scribeAgent) - gob.Register(&engine.Destination{}) defer scribeAgent.Client.Close() } } else { diff --git a/engine/calldesc.go b/engine/calldesc.go index 817b3ce86..16ce1fd64 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -96,6 +96,9 @@ func SetDebitPeriod(d time.Duration) { // Exported method to set the history scribe. func SetHistoryScribe(scribe history.Scribe) { + history.RegisterRecordFilename(&Destination{}) + history.RegisterRecordFilename(&RatingPlan{}) + history.RegisterRecordFilename(&RatingProfile{}) historyScribe = scribe } diff --git a/engine/destinations.go b/engine/destinations.go index e4e9162e1..819fbc2be 100644 --- a/engine/destinations.go +++ b/engine/destinations.go @@ -53,3 +53,8 @@ func (d *Destination) String() (result string) { func (d *Destination) AddPrefix(pfx string) { d.Prefixes = append(d.Prefixes, pfx) } + +// history record method +func (d *Destination) GetId() string { + return d.Id +} diff --git a/engine/history_test.go b/engine/history_test.go index a432493dd..19f8a2712 100644 --- a/engine/history_test.go +++ b/engine/history_test.go @@ -27,24 +27,26 @@ import ( func TestHistoryRatinPlans(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - if !strings.Contains(scribe.RpBuf.String(), `{"Key":"*out:vdf:0:minu","Object":{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}}`) { - t.Error("Error in destination history content:", scribe.RpBuf.String()) + buf := scribe.BufMap["ratingprofiles.json"] + if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) { + t.Error("Error in destination history content:", buf.String()) } } func TestHistoryDestinations(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - expected := `[{"Key":"ALL","Object":{"Id":"ALL","Prefixes":["49","41","43"]}} -{"Key":"GERMANY","Object":{"Id":"GERMANY","Prefixes":["49"]}} -{"Key":"GERMANY_O2","Object":{"Id":"GERMANY_O2","Prefixes":["41"]}} -{"Key":"GERMANY_PREMIUM","Object":{"Id":"GERMANY_PREMIUM","Prefixes":["43"]}} -{"Key":"NAT","Object":{"Id":"NAT","Prefixes":["0256","0257","0723"]}} -{"Key":"PSTN_70","Object":{"Id":"PSTN_70","Prefixes":["+4970"]}} -{"Key":"PSTN_71","Object":{"Id":"PSTN_71","Prefixes":["+4971"]}} -{"Key":"PSTN_72","Object":{"Id":"PSTN_72","Prefixes":["+4972"]}} -{"Key":"RET","Object":{"Id":"RET","Prefixes":["0723","0724"]}} -{"Key":"nat","Object":{"Id":"nat","Prefixes":["0257","0256","0723"]}}]` - if scribe.DestBuf.String() != expected { - t.Error("Error in destination history content:", scribe.DestBuf.String()) + buf := scribe.BufMap["destinations.json"] + expected := `[{"Id":"ALL","Prefixes":["49","41","43"]}, +{"Id":"GERMANY","Prefixes":["49"]}, +{"Id":"GERMANY_O2","Prefixes":["41"]}, +{"Id":"GERMANY_PREMIUM","Prefixes":["43"]}, +{"Id":"NAT","Prefixes":["0256","0257","0723"]}, +{"Id":"PSTN_70","Prefixes":["+4970"]}, +{"Id":"PSTN_71","Prefixes":["+4971"]}, +{"Id":"PSTN_72","Prefixes":["+4972"]}, +{"Id":"RET","Prefixes":["0723","0724"]}, +{"Id":"nat","Prefixes":["0257","0256","0723"]}]` + if buf.String() != expected { + t.Error("Error in destination history content:", buf.String()) } } diff --git a/engine/ratingplan.go b/engine/ratingplan.go index 5db3fc4b7..a33b9a6d5 100644 --- a/engine/ratingplan.go +++ b/engine/ratingplan.go @@ -97,3 +97,8 @@ func (rp *RatingPlan) AddRateInterval(dId string, ris ...*RateInterval) { func (rp *RatingPlan) Equal(o *RatingPlan) bool { return rp.Id == o.Id } + +// history record method +func (rp *RatingPlan) GetId() string { + return rp.Id +} diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 869c55ed1..159e5ea2e 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -146,3 +146,8 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error) return errors.New("not found") } + +// history record method +func (rpf *RatingProfile) GetId() string { + return rpf.Id +} diff --git a/engine/storage_map.go b/engine/storage_map.go index f4169251e..071966b36 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -22,11 +22,10 @@ import ( "errors" "fmt" - "github.com/cgrates/cgrates/cache2go" - "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/utils" "strings" "time" + "github.com/cgrates/cgrates/cache2go" + "github.com/cgrates/cgrates/utils" ) type MapStorage struct { @@ -123,7 +122,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { result, err := ms.ms.Marshal(rp) ms.dict[RATING_PLAN_PREFIX+rp.Id] = result response := 0 - go historyScribe.Record(&history.Record{RATING_PLAN_PREFIX + rp.Id, rp}, &response) + go historyScribe.Record(rp, &response) cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -151,7 +150,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { result, err := ms.ms.Marshal(rpf) ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 - go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response) + go historyScribe.Record(rpf, &response) cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -180,7 +179,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { result, err := ms.ms.Marshal(dest) ms.dict[DESTINATION_PREFIX+dest.Id] = result response := 0 - go historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) + go historyScribe.Record(dest, &response) cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index da69f652e..6438ae9dd 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -20,11 +20,10 @@ package engine import ( "fmt" - "github.com/cgrates/cgrates/history" - "labix.org/v2/mgo" - "labix.org/v2/mgo/bson" "log" "time" + "labix.org/v2/mgo" + "labix.org/v2/mgo/bson" ) type MongoStorage struct { @@ -137,7 +136,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{RATING_PLAN_PREFIX + rp.Id, rp}, &response) + historyScribe.Record(rp, &response) } return ms.db.C("ratingplans").Insert(rp) } @@ -151,7 +150,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) + historyScribe.Record(rp, &response) } return ms.db.C("ratingprofiles").Insert(rp) } @@ -168,7 +167,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err func (ms *MongoStorage) SetDestination(dest *Destination) error { if historyScribe != nil { response := 0 - historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) + historyScribe.Record(dest, &response) } return ms.db.C("destinations").Insert(dest) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index cbf2fdde2..99b2adefb 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -25,7 +25,6 @@ import ( "fmt" "github.com/cgrates/cgrates/cache2go" - "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" "github.com/hoisie/redis" @@ -195,7 +194,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{RATING_PLAN_PREFIX + rp.Id, rp}, &response) + go historyScribe.Record(rp, &response) } //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return @@ -223,7 +222,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response) + go historyScribe.Record(rpf, &response) } //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return @@ -272,7 +271,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{DESTINATION_PREFIX + dest.Id, dest}, &response) + go historyScribe.Record(dest, &response) } //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return diff --git a/history/file_scribe.go b/history/file_scribe.go index 0db5ec0f9..8dbe175b8 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -22,31 +22,22 @@ import ( "bufio" "encoding/json" "errors" + "fmt" "io" "os" "os/exec" "path/filepath" - "strings" "sync" "time" ) -const ( - DESTINATIONS_FILE = "destinations.json" - RATING_PLANS_FILE = "rating_plans.json" - RATING_PROFILES_FILE = "rating_profiles.json" -) - type FileScribe struct { - mu sync.Mutex - fileRoot string - gitCommand string - destinations records - ratingPlans records - ratingProfiles records - loopChecker chan int - waitingFile string - savePeriod time.Duration + mu sync.Mutex + fileRoot string + gitCommand string + loopChecker chan int + waitingFile string + savePeriod time.Duration } func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, error) { @@ -58,31 +49,20 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er s := &FileScribe{fileRoot: fileRoot, gitCommand: gitCommand, savePeriod: saveInterval} s.loopChecker = make(chan int) s.gitInit() - if err := s.load(DESTINATIONS_FILE); err != nil { - return nil, err - } - if err := s.load(RATING_PLANS_FILE); err != nil { - return nil, err - } - if err := s.load(RATING_PROFILES_FILE); err != nil { - return nil, err + + for fn, _ := range recordsMap { + if err := s.load(fn); err != nil { + return nil, err + } } return s, nil } -func (s *FileScribe) Record(rec *Record, out *int) error { +func (s *FileScribe) Record(rec Record, out *int) error { s.mu.Lock() - var fileToSave string - switch { - case strings.HasPrefix(rec.Key, DESTINATION_PREFIX): - s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object}) - fileToSave = DESTINATIONS_FILE - case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX): - s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object}) - fileToSave = RATING_PLANS_FILE - case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX): - s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object}) - fileToSave = RATING_PROFILES_FILE + fileToSave := GetRFN(rec) + if records, ok := recordsMap[fileToSave]; ok { + records.SetOrAdd(rec) } // flood protection for save method (do not save on every loop iteration) @@ -126,21 +106,14 @@ func (s *FileScribe) gitInit() error { if out, err := cmd.Output(); err != nil { return errors.New(string(out) + " " + err.Error()) } - if f, err := os.Create(filepath.Join(s.fileRoot, DESTINATIONS_FILE)); err != nil { - return errors.New(" Error writing destinations file: " + err.Error()) - } else { - f.Close() - } - if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PLANS_FILE)); err != nil { - return errors.New(" Error writing rating plans file: " + err.Error()) - } else { - f.Close() - } - if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PROFILES_FILE)); err != nil { - return errors.New(" Error writing rating profiles file: " + err.Error()) - } else { - f.Close() + for fn, _ := range recordsMap { + if f, err := os.Create(filepath.Join(s.fileRoot, fn)); err != nil { + return fmt.Errorf(" Error writing %s file: %s", fn, err.Error()) + } else { + f.Close() + } } + cmd = exec.Command(s.gitCommand, "add", ".") cmd.Dir = s.fileRoot if out, err := cmd.Output(); err != nil { @@ -169,23 +142,11 @@ func (s *FileScribe) load(filename string) error { defer f.Close() d := json.NewDecoder(f) - switch filename { - case DESTINATIONS_FILE: - if err := d.Decode(&s.destinations); err != nil && err != io.EOF { - return errors.New(" Error loading destinations: " + err.Error()) - } - s.destinations.Sort() - case RATING_PLANS_FILE: - if err := d.Decode(&s.ratingPlans); err != nil && err != io.EOF { - return errors.New(" Error loading rating plans: " + err.Error()) - } - s.ratingPlans.Sort() - case RATING_PROFILES_FILE: - if err := d.Decode(&s.ratingProfiles); err != nil && err != io.EOF { - return errors.New(" Error loading rating profiles: " + err.Error()) - } - s.ratingProfiles.Sort() + records := recordsMap[filename] + if err := d.Decode(&records); err != nil && err != io.EOF { + return fmt.Errorf(" Error loading %s: %s", filename, err.Error()) } + records.Sort() return nil } @@ -198,38 +159,11 @@ func (s *FileScribe) save(filename string) error { } b := bufio.NewWriter(f) - switch filename { - case DESTINATIONS_FILE: - if err := s.format(b, s.destinations); err != nil { - return err - } - case RATING_PLANS_FILE: - if err := s.format(b, s.ratingPlans); err != nil { - return err - } - case RATING_PROFILES_FILE: - if err := s.format(b, s.ratingProfiles); err != nil { - return err - } + records := recordsMap[filename] + if err := format(b, records); err != nil { + return err } b.Flush() f.Close() return s.gitCommit() } - -func (s *FileScribe) format(b io.Writer, recs records) error { - recs.Sort() - b.Write([]byte("[")) - for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) - if i < len(recs)-1 { - b.Write([]byte(",\n")) - } - } - b.Write([]byte("]")) - return nil -} diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 0f1caf649..3524340fd 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -21,85 +21,38 @@ package history import ( "bufio" "bytes" - "encoding/json" - "io" - "strings" "sync" ) type MockScribe struct { - mu sync.Mutex - destinations records - ratingPlans records - ratingProfiles records - DestBuf bytes.Buffer - RplBuf bytes.Buffer - RpBuf bytes.Buffer + mu sync.Mutex + BufMap map[string]*bytes.Buffer } func NewMockScribe() (*MockScribe, error) { - return &MockScribe{}, nil + return &MockScribe{BufMap: map[string]*bytes.Buffer{ + "destinations.json": bytes.NewBuffer(nil), + "ratingplans.json": bytes.NewBuffer(nil), + "ratingprofiles.json": bytes.NewBuffer(nil), + }}, nil } -func (s *MockScribe) Record(rec *Record, out *int) error { - switch { - case strings.HasPrefix(rec.Key, DESTINATION_PREFIX): - s.destinations = s.destinations.SetOrAdd(&Record{rec.Key[len(DESTINATION_PREFIX):], rec.Object}) - s.save(DESTINATIONS_FILE) - case strings.HasPrefix(rec.Key, RATING_PLAN_PREFIX): - s.ratingPlans = s.ratingPlans.SetOrAdd(&Record{rec.Key[len(RATING_PLAN_PREFIX):], rec.Object}) - s.save(RATING_PLANS_FILE) - case strings.HasPrefix(rec.Key, RATING_PROFILE_PREFIX): - s.ratingProfiles = s.ratingProfiles.SetOrAdd(&Record{rec.Key[len(RATING_PROFILE_PREFIX):], rec.Object}) - s.save(RATING_PROFILES_FILE) - } - *out = 0 +func (s *MockScribe) Record(rec Record, out *int) error { + fn := GetRFN(rec) + recordsMap[fn] = recordsMap[fn].SetOrAdd(rec) + s.save(fn) return nil } func (s *MockScribe) save(filename string) error { s.mu.Lock() defer s.mu.Unlock() - switch filename { - case DESTINATIONS_FILE: - s.DestBuf.Reset() - b := bufio.NewWriter(&s.DestBuf) - defer b.Flush() - if err := s.format(b, s.destinations); err != nil { - return err - } - case RATING_PLANS_FILE: - s.RplBuf.Reset() - b := bufio.NewWriter(&s.RplBuf) - defer b.Flush() - if err := s.format(b, s.ratingPlans); err != nil { - return err - } - case RATING_PROFILES_FILE: - s.RpBuf.Reset() - b := bufio.NewWriter(&s.RpBuf) - defer b.Flush() - if err := s.format(b, s.ratingProfiles); err != nil { - return err - } + records := recordsMap[filename] + s.BufMap[filename].Reset() + b := bufio.NewWriter(s.BufMap[filename]) + defer b.Flush() + if err := format(b, records); err != nil { + return err } - - return nil -} - -func (s *MockScribe) format(b io.Writer, recs records) error { - recs.Sort() - b.Write([]byte("[")) - for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) - if i < len(recs)-1 { - b.Write([]byte("\n")) - } - } - b.Write([]byte("]")) return nil } diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index 31fd3efb6..c7be783ea 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -18,11 +18,10 @@ along with this program. If not, see package history -import "net/rpc" - -const ( - JSON = "json" - GOB = "gob" +import ( + "encoding/gob" + "log" + "net/rpc" ) type ProxyScribe struct { @@ -38,6 +37,12 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) { return &ProxyScribe{Client: client}, nil } -func (ps *ProxyScribe) Record(rec *Record, out *int) error { - return ps.Client.Call("Scribe.Record", rec, out) +func RRR(r interface{}) { + gob.Register(r) +} + +func (ps *ProxyScribe) Record(rec Record, out *int) error { + err := ps.Client.Call("Scribe.Record", &rec, out) + log.Printf("Result for %v: %v", rec, err) + return err } diff --git a/history/scribe.go b/history/scribe.go index 8391feb0e..97c8a29b1 100644 --- a/history/scribe.go +++ b/history/scribe.go @@ -19,25 +19,27 @@ along with this program. If not, see package history import ( + "encoding/json" + "io" + "reflect" "sort" -) - -const ( - RATING_PLAN_PREFIX = "rpl_" - RATING_PROFILE_PREFIX = "rpf_" - DESTINATION_PREFIX = "dst_" + "strings" ) type Scribe interface { - Record(record *Record, out *int) error + Record(Record, *int) error } -type Record struct { - Key string - Object interface{} +type Record interface { + GetId() string } -type records []*Record +type records []Record + +var ( + recordsMap = make(map[string]records) + filenameMap = make(map[reflect.Type]string) +) func (rs records) Len() int { return len(rs) @@ -48,19 +50,19 @@ func (rs records) Swap(i, j int) { } func (rs records) Less(i, j int) bool { - return rs[i].Key < rs[j].Key + return rs[i].GetId() < rs[j].GetId() } func (rs records) Sort() { sort.Sort(rs) } -func (rs records) SetOrAdd(rec *Record) records { +func (rs records) SetOrAdd(rec Record) records { //rs.Sort() n := len(rs) - i := sort.Search(n, func(i int) bool { return rs[i].Key >= rec.Key }) - if i < n && rs[i].Key == rec.Key { - rs[i].Object = rec.Object + i := sort.Search(n, func(i int) bool { return rs[i].GetId() >= rec.GetId() }) + if i < n && rs[i].GetId() == rec.GetId() { + rs[i] = rec } else { // i is the index where it would be inserted. rs = append(rs, nil) @@ -70,10 +72,10 @@ func (rs records) SetOrAdd(rec *Record) records { return rs } -func (rs records) SetOrAddOld(rec *Record) records { +/*func (rs records) SetOrAdd(rec Record) records { found := false for _, r := range rs { - if r.Key == rec.Key { + if r.GetId() == rec.GetId() { found = true r.Object = rec.Object return rs @@ -83,4 +85,36 @@ func (rs records) SetOrAddOld(rec *Record) records { rs = append(rs, rec) } return rs +}*/ + +func format(b io.Writer, recs records) error { + recs.Sort() + b.Write([]byte("[")) + for i, r := range recs { + src, err := json.Marshal(r) + if err != nil { + return err + } + b.Write(src) + if i < len(recs)-1 { + b.Write([]byte(",\n")) + } + } + b.Write([]byte("]")) + return nil } + +func GetRFN(rec Record) string { + if fn, ok := filenameMap[reflect.TypeOf(rec)]; ok { + return fn + } else { + typ := reflect.TypeOf(rec) + typeSegments := strings.Split(typ.String(), ".") + fn = strings.ToLower(typeSegments[len(typeSegments)-1]) + "s.json" + filenameMap[typ] = fn + recordsMap[fn] = make(records, 0) + return fn + } +} + +var RegisterRecordFilename = GetRFN // will create a key in filename and records map diff --git a/history/scribe_test.go b/history/scribe_test.go index 8f586dcf5..1fa264f34 100644 --- a/history/scribe_test.go +++ b/history/scribe_test.go @@ -23,18 +23,28 @@ import ( "testing" ) +type TestRecord struct { + Id string +} + +func (tr *TestRecord) GetId() string { + return tr.Id +} + func TestHistorySet(t *testing.T) { - rs := records{&Record{"first", "test"}} - rs.SetOrAdd(&Record{"first", "new value"}) - if len(rs) != 1 || rs[0].Object != "new value" { + rs := records{&TestRecord{"first"}} + second := &TestRecord{"first"} + rs.SetOrAdd(second) + if len(rs) != 1 || rs[0] != second { t.Error("error setting new value: ", rs[0]) } } func TestHistoryAdd(t *testing.T) { - rs := records{&Record{"first", "test"}} - rs = rs.SetOrAdd(&Record{"second", "new value"}) - if len(rs) != 2 || rs[1].Object != "new value" { + rs := records{&TestRecord{"first"}} + second := &TestRecord{"second"} + rs = rs.SetOrAdd(second) + if len(rs) != 2 || rs[1] != second { t.Error("error setting new value: ", rs) } } @@ -42,19 +52,9 @@ func TestHistoryAdd(t *testing.T) { func BenchmarkSetOrAdd(b *testing.B) { var rs records for i := 0; i < 1000; i++ { - rs = rs.SetOrAdd(&Record{strconv.Itoa(i), strconv.Itoa(i)}) + rs = rs.SetOrAdd(&TestRecord{strconv.Itoa(i)}) } for i := 0; i < b.N; i++ { - rs.SetOrAdd(&Record{"400", "test"}) - } -} - -func BenchmarkSetOrAddOld(b *testing.B) { - var rs records - for i := 0; i < 1000; i++ { - rs = rs.SetOrAddOld(&Record{strconv.Itoa(i), strconv.Itoa(i)}) - } - for i := 0; i < b.N; i++ { - rs.SetOrAddOld(&Record{"400", "test"}) + rs.SetOrAdd(&TestRecord{"400"}) } } From 3bbb67c72ffdd314159c1e98795ee19dd48d6c98 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 25 Jan 2014 11:48:35 +0200 Subject: [PATCH 5/9] working variant for history server and gob enc --- cmd/cgr-loader/cgr-loader.go | 2 -- engine/calldesc.go | 3 -- engine/destinations.go | 16 ++++++++-- engine/history_test.go | 4 +-- engine/ratingplan.go | 15 +++++++-- engine/ratingprofile.go | 11 +++++-- engine/storage_map.go | 6 ++-- engine/storage_mongo.go | 7 +++-- engine/storage_redis.go | 6 ++-- history/file_scribe.go | 8 ++--- history/mock_scribe.go | 10 +++--- history/proxy_scribe.go | 14 ++------- history/scribe.go | 60 ++++++++++-------------------------- history/scribe_test.go | 20 ++++-------- 14 files changed, 79 insertions(+), 103 deletions(-) diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 6060e552a..8d6a44c7c 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -19,7 +19,6 @@ along with this program. If not, see package main import ( - "encoding/gob" "flag" "fmt" "log" @@ -157,7 +156,6 @@ func main() { log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) return } else { - gob.Register(engine.Destination{}) engine.SetHistoryScribe(scribeAgent) defer scribeAgent.Client.Close() } diff --git a/engine/calldesc.go b/engine/calldesc.go index 16ce1fd64..817b3ce86 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -96,9 +96,6 @@ func SetDebitPeriod(d time.Duration) { // Exported method to set the history scribe. func SetHistoryScribe(scribe history.Scribe) { - history.RegisterRecordFilename(&Destination{}) - history.RegisterRecordFilename(&RatingPlan{}) - history.RegisterRecordFilename(&RatingProfile{}) historyScribe = scribe } diff --git a/engine/destinations.go b/engine/destinations.go index 819fbc2be..baf4e06fa 100644 --- a/engine/destinations.go +++ b/engine/destinations.go @@ -18,7 +18,12 @@ along with this program. If not, see package engine -import "strings" +import ( + "encoding/json" + "strings" + + "github.com/cgrates/cgrates/history" +) /* Structure that gathers multiple destination prefixes under a common id. @@ -55,6 +60,11 @@ func (d *Destination) AddPrefix(pfx string) { } // history record method -func (d *Destination) GetId() string { - return d.Id +func (d *Destination) GetHistoryRecord() history.Record { + js, _ := json.Marshal(d) + return history.Record{ + Id: d.Id, + Filename: history.DESTINATIONS_FN, + Payload: js, + } } diff --git a/engine/history_test.go b/engine/history_test.go index 19f8a2712..aa599ae6e 100644 --- a/engine/history_test.go +++ b/engine/history_test.go @@ -27,7 +27,7 @@ import ( func TestHistoryRatinPlans(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - buf := scribe.BufMap["ratingprofiles.json"] + buf := scribe.BufMap[history.RATING_PROFILES_FN] if !strings.Contains(buf.String(), `{"Id":"*out:vdf:0:minu","RatingPlanActivations":[{"ActivationTime":"2012-01-01T00:00:00Z","RatingPlanId":"EVENING","FallbackKeys":null}]}`) { t.Error("Error in destination history content:", buf.String()) } @@ -35,7 +35,7 @@ func TestHistoryRatinPlans(t *testing.T) { func TestHistoryDestinations(t *testing.T) { scribe := historyScribe.(*history.MockScribe) - buf := scribe.BufMap["destinations.json"] + buf := scribe.BufMap[history.DESTINATIONS_FN] expected := `[{"Id":"ALL","Prefixes":["49","41","43"]}, {"Id":"GERMANY","Prefixes":["49"]}, {"Id":"GERMANY_O2","Prefixes":["41"]}, diff --git a/engine/ratingplan.go b/engine/ratingplan.go index a33b9a6d5..d0ddf2249 100644 --- a/engine/ratingplan.go +++ b/engine/ratingplan.go @@ -18,6 +18,12 @@ along with this program. If not, see package engine +import ( + "encoding/json" + + "github.com/cgrates/cgrates/history" +) + /* The struture that is saved to storage. */ @@ -99,6 +105,11 @@ func (rp *RatingPlan) Equal(o *RatingPlan) bool { } // history record method -func (rp *RatingPlan) GetId() string { - return rp.Id +func (rp *RatingPlan) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rp) + return history.Record{ + Id: rp.Id, + Filename: history.RATING_PLANS_FN, + Payload: js, + } } diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 159e5ea2e..12bd6ea5e 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -19,12 +19,14 @@ along with this program. If not, see package engine import ( + "encoding/json" "errors" "fmt" "sort" "time" "github.com/cgrates/cgrates/cache2go" + "github.com/cgrates/cgrates/history" "github.com/cgrates/cgrates/utils" ) @@ -148,6 +150,11 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error) } // history record method -func (rpf *RatingProfile) GetId() string { - return rpf.Id +func (rpf *RatingProfile) GetHistoryRecord() history.Record { + js, _ := json.Marshal(rpf) + return history.Record{ + Id: rpf.Id, + Filename: history.RATING_PROFILES_FN, + Payload: js, + } } diff --git a/engine/storage_map.go b/engine/storage_map.go index 071966b36..78127a4f5 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -122,7 +122,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { result, err := ms.ms.Marshal(rp) ms.dict[RATING_PLAN_PREFIX+rp.Id] = result response := 0 - go historyScribe.Record(rp, &response) + go historyScribe.Record(rp.GetHistoryRecord(), &response) cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -150,7 +150,7 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { result, err := ms.ms.Marshal(rpf) ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 - go historyScribe.Record(rpf, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } @@ -179,7 +179,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { result, err := ms.ms.Marshal(dest) ms.dict[DESTINATION_PREFIX+dest.Id] = result response := 0 - go historyScribe.Record(dest, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 6438ae9dd..f136ad71b 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "time" + "labix.org/v2/mgo" "labix.org/v2/mgo/bson" ) @@ -136,7 +137,7 @@ func (ms *MongoStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { if historyScribe != nil { response := 0 - historyScribe.Record(rp, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingplans").Insert(rp) } @@ -150,7 +151,7 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { if historyScribe != nil { response := 0 - historyScribe.Record(rp, &response) + historyScribe.Record(rp.GetHistoryRecord(), &response) } return ms.db.C("ratingprofiles").Insert(rp) } @@ -167,7 +168,7 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err func (ms *MongoStorage) SetDestination(dest *Destination) error { if historyScribe != nil { response := 0 - historyScribe.Record(dest, &response) + historyScribe.Record(dest.GetHistoryRecord(), &response) } return ms.db.C("destinations").Insert(dest) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 99b2adefb..807d75e17 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -194,7 +194,7 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { err = rs.db.Set(RATING_PLAN_PREFIX+rp.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rp, &response) + go historyScribe.Record(rp.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return @@ -222,7 +222,7 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(rpf, &response) + go historyScribe.Record(rpf.GetHistoryRecord(), &response) } //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return @@ -271,7 +271,7 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { err = rs.db.Set(DESTINATION_PREFIX+dest.Id, b.Bytes()) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(dest, &response) + go historyScribe.Record(dest.GetHistoryRecord(), &response) } //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return diff --git a/history/file_scribe.go b/history/file_scribe.go index 8dbe175b8..5694a97dc 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -50,7 +50,7 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er s.loopChecker = make(chan int) s.gitInit() - for fn, _ := range recordsMap { + for _, fn := range []string{DESTINATIONS_FN, RATING_PLANS_FN, RATING_PROFILES_FN} { if err := s.load(fn); err != nil { return nil, err } @@ -60,10 +60,8 @@ func NewFileScribe(fileRoot string, saveInterval time.Duration) (*FileScribe, er func (s *FileScribe) Record(rec Record, out *int) error { s.mu.Lock() - fileToSave := GetRFN(rec) - if records, ok := recordsMap[fileToSave]; ok { - records.SetOrAdd(rec) - } + fileToSave := rec.Filename + recordsMap[fileToSave] = recordsMap[fileToSave].SetOrAdd(&rec) // flood protection for save method (do not save on every loop iteration) if s.waitingFile == fileToSave { diff --git a/history/mock_scribe.go b/history/mock_scribe.go index 3524340fd..808298a8e 100644 --- a/history/mock_scribe.go +++ b/history/mock_scribe.go @@ -31,15 +31,15 @@ type MockScribe struct { func NewMockScribe() (*MockScribe, error) { return &MockScribe{BufMap: map[string]*bytes.Buffer{ - "destinations.json": bytes.NewBuffer(nil), - "ratingplans.json": bytes.NewBuffer(nil), - "ratingprofiles.json": bytes.NewBuffer(nil), + DESTINATIONS_FN: bytes.NewBuffer(nil), + RATING_PLANS_FN: bytes.NewBuffer(nil), + RATING_PROFILES_FN: bytes.NewBuffer(nil), }}, nil } func (s *MockScribe) Record(rec Record, out *int) error { - fn := GetRFN(rec) - recordsMap[fn] = recordsMap[fn].SetOrAdd(rec) + fn := rec.Filename + recordsMap[fn] = recordsMap[fn].SetOrAdd(&rec) s.save(fn) return nil } diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index c7be783ea..0d5b1b3dd 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -18,11 +18,7 @@ along with this program. If not, see package history -import ( - "encoding/gob" - "log" - "net/rpc" -) +import "net/rpc" type ProxyScribe struct { Client *rpc.Client @@ -37,12 +33,6 @@ func NewProxyScribe(addr string) (*ProxyScribe, error) { return &ProxyScribe{Client: client}, nil } -func RRR(r interface{}) { - gob.Register(r) -} - func (ps *ProxyScribe) Record(rec Record, out *int) error { - err := ps.Client.Call("Scribe.Record", &rec, out) - log.Printf("Result for %v: %v", rec, err) - return err + return ps.Client.Call("Scribe.Record", rec, out) } diff --git a/history/scribe.go b/history/scribe.go index 97c8a29b1..e1bb9c3a4 100644 --- a/history/scribe.go +++ b/history/scribe.go @@ -19,22 +19,28 @@ along with this program. If not, see package history import ( - "encoding/json" "io" "reflect" "sort" - "strings" +) + +const ( + DESTINATIONS_FN = "destinations.json" + RATING_PLANS_FN = "rating_plans.json" + RATING_PROFILES_FN = "rating_profiles.json" ) type Scribe interface { Record(Record, *int) error } -type Record interface { - GetId() string +type Record struct { + Id string + Filename string + Payload []byte } -type records []Record +type records []*Record var ( recordsMap = make(map[string]records) @@ -50,18 +56,18 @@ func (rs records) Swap(i, j int) { } func (rs records) Less(i, j int) bool { - return rs[i].GetId() < rs[j].GetId() + return rs[i].Id < rs[j].Id } func (rs records) Sort() { sort.Sort(rs) } -func (rs records) SetOrAdd(rec Record) records { +func (rs records) SetOrAdd(rec *Record) records { //rs.Sort() n := len(rs) - i := sort.Search(n, func(i int) bool { return rs[i].GetId() >= rec.GetId() }) - if i < n && rs[i].GetId() == rec.GetId() { + i := sort.Search(n, func(i int) bool { return rs[i].Id >= rec.Id }) + if i < n && rs[i].Id == rec.Id { rs[i] = rec } else { // i is the index where it would be inserted. @@ -72,30 +78,11 @@ func (rs records) SetOrAdd(rec Record) records { return rs } -/*func (rs records) SetOrAdd(rec Record) records { - found := false - for _, r := range rs { - if r.GetId() == rec.GetId() { - found = true - r.Object = rec.Object - return rs - } - } - if !found { - rs = append(rs, rec) - } - return rs -}*/ - func format(b io.Writer, recs records) error { recs.Sort() b.Write([]byte("[")) for i, r := range recs { - src, err := json.Marshal(r) - if err != nil { - return err - } - b.Write(src) + b.Write(r.Payload) if i < len(recs)-1 { b.Write([]byte(",\n")) } @@ -103,18 +90,3 @@ func format(b io.Writer, recs records) error { b.Write([]byte("]")) return nil } - -func GetRFN(rec Record) string { - if fn, ok := filenameMap[reflect.TypeOf(rec)]; ok { - return fn - } else { - typ := reflect.TypeOf(rec) - typeSegments := strings.Split(typ.String(), ".") - fn = strings.ToLower(typeSegments[len(typeSegments)-1]) + "s.json" - filenameMap[typ] = fn - recordsMap[fn] = make(records, 0) - return fn - } -} - -var RegisterRecordFilename = GetRFN // will create a key in filename and records map diff --git a/history/scribe_test.go b/history/scribe_test.go index 1fa264f34..31011b98f 100644 --- a/history/scribe_test.go +++ b/history/scribe_test.go @@ -23,17 +23,9 @@ import ( "testing" ) -type TestRecord struct { - Id string -} - -func (tr *TestRecord) GetId() string { - return tr.Id -} - func TestHistorySet(t *testing.T) { - rs := records{&TestRecord{"first"}} - second := &TestRecord{"first"} + rs := records{&Record{Id: "first"}} + second := &Record{Id: "first"} rs.SetOrAdd(second) if len(rs) != 1 || rs[0] != second { t.Error("error setting new value: ", rs[0]) @@ -41,8 +33,8 @@ func TestHistorySet(t *testing.T) { } func TestHistoryAdd(t *testing.T) { - rs := records{&TestRecord{"first"}} - second := &TestRecord{"second"} + rs := records{&Record{Id: "first"}} + second := &Record{Id: "second"} rs = rs.SetOrAdd(second) if len(rs) != 2 || rs[1] != second { t.Error("error setting new value: ", rs) @@ -52,9 +44,9 @@ func TestHistoryAdd(t *testing.T) { func BenchmarkSetOrAdd(b *testing.B) { var rs records for i := 0; i < 1000; i++ { - rs = rs.SetOrAdd(&TestRecord{strconv.Itoa(i)}) + rs = rs.SetOrAdd(&Record{Id: strconv.Itoa(i)}) } for i := 0; i < b.N; i++ { - rs.SetOrAdd(&TestRecord{"400"}) + rs.SetOrAdd(&Record{Id: "400"}) } } From 8733b5219e711b0b40d65b2bda4e6fc64de5959f Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 25 Jan 2014 14:28:03 +0100 Subject: [PATCH 6/9] Mediator and SessionManager wait for cache to come up before starting over internal interface --- cmd/cgr-engine/cgr-engine.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5e03a6fa0..74457c6bf 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -87,9 +87,10 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage close(doneChan) } -func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, chanDone chan struct{}) { +func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, cacheChan, chanDone chan struct{}) { var connector engine.Connector if cfg.MediatorRater == INTERNAL { + <-cacheChan // Cache needs to come up before we are ready connector = responder } else { var client *rpc.Client @@ -132,9 +133,10 @@ func startCdrc() { exitChan <- true // If run stopped, something is bad, stop the application } -func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) { +func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) { var connector engine.Connector if cfg.SMRater == INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries connector = responder } else { var client *rpc.Client @@ -222,7 +224,6 @@ func serveRpc(rpcWaitChans []chan struct{}) { // Each of the serve blocks so need to start in their own goroutine go server.ServeJSON(cfg.RPCJSONListen) go server.ServeGOB(cfg.RPCGOBListen) - } // Starts the http server, waiting for the necessary components to finish their tasks @@ -353,8 +354,9 @@ func main() { rpcWait := make([]chan struct{}, 0) // Rpc server will start as soon as this list is consumed httpWait := make([]chan struct{}, 0) // Http server will start as soon as this list is consumed + var cacheChan chan struct{} if cfg.RaterEnabled { // Cache rating if rater enabled - cacheChan := make(chan struct{}) + cacheChan = make(chan struct{}) rpcWait = append(rpcWait, cacheChan) go cacheData(ratingDb, accountDb, cacheChan) } @@ -418,7 +420,7 @@ func main() { if cfg.MediatorEnabled { engine.Logger.Info("Starting CGRateS Mediator service.") medChan = make(chan struct{}) - go startMediator(responder, logDb, cdrDb, medChan) + go startMediator(responder, logDb, cdrDb, cacheChan, medChan) } if cfg.CDRSEnabled { @@ -430,7 +432,7 @@ func main() { if cfg.SMEnabled { engine.Logger.Info("Starting CGRateS SessionManager service.") - go startSessionManager(responder, logDb) + go startSessionManager(responder, logDb, cacheChan) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler() } From 1bbe5a0a88f21065d265307d6d68e7c858114958 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 25 Jan 2014 20:52:32 +0100 Subject: [PATCH 7/9] CDRc communicating over internal interface with the CDRs --- apier/v1/apier_local_test.go | 2 +- cdrc/cdrc.go | 74 +++++++++++++++++++++++++++++++++--- cdrc/cdrc_local_test.go | 5 ++- cdrs/cdrs.go | 55 ++++++++++++++++----------- cdrs/fscdr.go | 10 ++--- cdrs/fscdr_test.go | 18 ++++----- cmd/cgr-engine/cgr-engine.go | 43 ++++++++++++--------- config/config.go | 3 +- config/config_test.go | 2 +- data/conf/cgrates.cfg | 2 +- engine/storage_map.go | 4 +- utils/consts.go | 2 + utils/ratedcdr.go | 22 +++++++++++ 13 files changed, 174 insertions(+), 68 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index 9d96d3150..febfabf9b 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -1228,7 +1228,7 @@ func TestCdrServer(t *testing.T) { "answer_time": []string{"2013-11-07T08:42:26Z"}, "duration": []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}} for _, cdrForm := range []url.Values{cdrForm1, cdrForm2} { cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL) - if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.CdrcCdrs), cdrForm); err != nil { + if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", "127.0.0.1:2080"), cdrForm); err != nil { t.Error(err.Error()) } } diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 3df8587e6..87510bfc0 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -33,6 +33,7 @@ import ( "strings" "time" + "github.com/cgrates/cgrates/cdrs" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -44,8 +45,8 @@ const ( FS_CSV = "freeswitch_csv" ) -func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { - cdrc := &Cdrc{cgrCfg: config} +func NewCdrc(config *config.CGRConfig, cdrServer *cdrs.CDRS) (*Cdrc, error) { + cdrc := &Cdrc{cgrCfg: config, cdrServer: cdrServer} // Before processing, make sure in and out folders exist for _, dir := range []string{cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { @@ -61,6 +62,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { type Cdrc struct { cgrCfg *config.CGRConfig + cdrServer *cdrs.CDRS cfgCdrFields map[string]string // Key is the name of the field httpClient *http.Client } @@ -140,6 +142,59 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { return v, nil } +// Takes the record out of csv and turns it into http form which can be posted +func (self *Cdrc) cdrAsRatedCdr(record []string) (*utils.RatedCDR, error) { + ratedCdr := &utils.RatedCDR{CdrSource: self.cgrCfg.CdrcSourceId} + var err error + for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { + var fieldVal string + if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) { + fieldVal = cfgFieldVal[1:] + } else if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) { + if cfgFieldIdx, err := strconv.Atoi(cfgFieldVal); err != nil { // Should in theory never happen since we have already parsed config + return nil, err + } else if len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cfgFieldName) + } else { + fieldVal = record[cfgFieldIdx] + } + } else { // Modify here when we add more supported cdr formats + fieldVal = "UNKNOWN" + } + switch cfgFieldName { + case utils.ACCID: + ratedCdr.CgrId = utils.FSCgrId(fieldVal) + ratedCdr.AccId = fieldVal + case utils.REQTYPE: + ratedCdr.ReqType = fieldVal + case utils.DIRECTION: + ratedCdr.Direction = fieldVal + case utils.TENANT: + ratedCdr.Tenant = fieldVal + case utils.TOR: + ratedCdr.TOR = fieldVal + case utils.ACCOUNT: + ratedCdr.Account = fieldVal + case utils.SUBJECT: + ratedCdr.Subject = fieldVal + case utils.DESTINATION: + ratedCdr.Destination = fieldVal + case utils.ANSWER_TIME: + if ratedCdr.AnswerTime, err = utils.ParseTimeDetectLayout(fieldVal); err != nil { + return nil, fmt.Errorf("Cannot parse answer time field, err: %s", err.Error()) + } + case utils.DURATION: + if ratedCdr.Duration, err = utils.ParseDurationWithSecs(fieldVal); err != nil { + return nil, fmt.Errorf("Cannot parse duration field, err: %s", err.Error()) + } + default: // Extra fields will not match predefined so they all show up here + ratedCdr.ExtraFields[cfgFieldName] = fieldVal + } + + } + return ratedCdr, nil +} + // One run over the CDR folder func (self *Cdrc) processCdrDir() error { engine.Logger.Info(fmt.Sprintf(" Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir)) @@ -199,14 +254,21 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue // Other csv related errors, ignore } - cdrAsForm, err := self.cdrAsHttpForm(record) + rawCdr, err := self.cdrAsRatedCdr(record) if err != nil { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue } - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), cdrAsForm); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) - continue + if self.cgrCfg.CdrcCdrs == utils.INTERNAL { + if err := self.cdrServer.ProcessRawCdr(rawCdr); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) + continue + } + } else { // CDRs listening on IP + if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), rawCdr.AsRawCdrHttpForm()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, error: %s", err.Error())) + continue + } } } // Finished with file, move it to processed folder diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go index e063cd209..0cfb95bec 100644 --- a/cdrc/cdrc_local_test.go +++ b/cdrc/cdrc_local_test.go @@ -130,10 +130,13 @@ func TestProcessCdrDir(t *testing.T) { if !*testLocal { return } + if cfg.CdrcCdrs == utils.INTERNAL { // For now we only test over network + return + } if err := startEngine(); err != nil { t.Fatal(err.Error()) } - cdrc, err := NewCdrc(cfg) + cdrc, err := NewCdrc(cfg, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index e9d0118a2..05acc22bc 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -35,36 +35,42 @@ var ( medi *mediator.Mediator ) -func fsCdrHandler(w http.ResponseWriter, r *http.Request) { - body, _ := ioutil.ReadAll(r.Body) - if fsCdr, err := new(FSCdr).New(body); err == nil { - storage.SetCdr(fsCdr) - go func() { //FS will not send us hangup_complete until we have send back the answer to CDR, so we need to handle mediation async - if cfg.CDRSMediator == "internal" { - medi.MediateRawCDR(fsCdr) - } else { - //TODO: use the connection to mediator +// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline +func storeAndMediate(rawCdr utils.RawCDR) error { + if err := storage.SetCdr(rawCdr); err != nil { + return err + } + if cfg.CDRSMediator == utils.INTERNAL { + go func() { + if err := medi.MediateRawCDR(rawCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", err.Error())) } }() - } else { + } + return nil +} + +// Handler for generic cgr cdr http +func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { + cgrCdr, err := utils.NewCgrCdrFromHttpReq(r) + if err != nil { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error())) } + if err := storeAndMediate(cgrCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error())) + } } -func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { - if cgrCdr, err := utils.NewCgrCdrFromHttpReq(r); err == nil { - storage.SetCdr(cgrCdr) - if cfg.CDRSMediator == "internal" { - errMedi := medi.MediateRawCDR(cgrCdr) - if errMedi != nil { - engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) - } - } else { - //TODO: use the connection to mediator - } - } else { +// Handler for fs http +func fsCdrHandler(w http.ResponseWriter, r *http.Request) { + body, _ := ioutil.ReadAll(r.Body) + fsCdr, err := new(FSCdr).New(body) + if err != nil { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error())) } + if err := storeAndMediate(fsCdr); err != nil { + engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error())) + } } type CDRS struct{} @@ -80,3 +86,8 @@ func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) { server.RegisterHttpFunc("/cgr", cgrCdrHandler) server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler) } + +// Used to internally process CDR +func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCDR) error { + return storeAndMediate(rawCdr) +} diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 145631ec6..2981aea4a 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -24,8 +24,8 @@ import ( "fmt" "github.com/cgrates/cgrates/utils" "strconv" - "time" "strings" + "time" ) const ( @@ -172,7 +172,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld rtCdr := new(utils.RatedCDR) rtCdr.MediationRunId = runId rtCdr.Cost = -1.0 // Default for non-rated CDR - if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId)==0 { + if rtCdr.AccId = fsCdr.GetAccId(); len(rtCdr.AccId) == 0 { if fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.ACCID)) } else { // Not mandatory, need to generate here CgrId @@ -181,10 +181,10 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld } else { // hasKey, use it to generate cgrid rtCdr.CgrId = utils.FSCgrId(rtCdr.AccId) } - if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost)==0 && fieldsMandatory { + if rtCdr.CdrHost = fsCdr.GetCdrHost(); len(rtCdr.CdrHost) == 0 && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRHOST)) } - if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource)==0 && fieldsMandatory { + if rtCdr.CdrSource = fsCdr.GetCdrSource(); len(rtCdr.CdrSource) == 0 && fieldsMandatory { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, utils.CDRSOURCE)) } if strings.HasPrefix(reqTypeFld, utils.STATIC_VALUE_PREFIX) { // Values starting with prefix are not dynamically populated @@ -232,7 +232,7 @@ func (fsCdr FSCdr) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld return nil, err } } - if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX){ + if durStr, hasKey = fsCdr[durationFld]; !hasKey && fieldsMandatory && !strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) { return nil, errors.New(fmt.Sprintf("%s:%s", utils.ERR_MANDATORY_IE_MISSING, durationFld)) } else { if strings.HasPrefix(durationFld, utils.STATIC_VALUE_PREFIX) { diff --git a/cdrs/fscdr_test.go b/cdrs/fscdr_test.go index 0fd986bc5..b6ca2c473 100644 --- a/cdrs/fscdr_test.go +++ b/cdrs/fscdr_test.go @@ -21,8 +21,8 @@ package cdrs import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "testing" "reflect" + "testing" "time" ) @@ -100,28 +100,28 @@ func TestFsCdrAsRatedCdr(t *testing.T) { if err != nil { t.Errorf("Error loading cdr: %v", err) } - rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination", + rtCdrOut, err := fsCdr.AsRatedCdr("wholesale_run", "^"+utils.RATED, "^*out", "cgr_tenant", "cgr_tor", "cgr_account", "cgr_subject", "cgr_destination", "answer_epoch", "billsec", []string{"effective_caller_id_number"}, true) if err != nil { t.Error("Unexpected error received", err) } - expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", + expctRatedCdr := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", CdrSource: FS_CDR_SOURCE, ReqType: utils.RATED, - Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963", - AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4)*time.Second, + Direction: "*out", Tenant: "ipbx.itsyscom.com", TOR: "call", Account: "dan", Subject: "dan", Destination: "+4986517174963", + AnswerTime: time.Date(2013, 8, 4, 9, 50, 56, 0, time.UTC).Local(), Duration: time.Duration(4) * time.Second, ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut, expctRatedCdr) { t.Errorf("Received: %v, expected: %v", rtCdrOut, expctRatedCdr) } - rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination", + rtCdrOut2, err := fsCdr.AsRatedCdr("wholesale_run", "^postpaid", "^*in", "^cgrates.com", "^premium_call", "^first_account", "^first_subject", "cgr_destination", "^2013-12-07T08:42:26Z", "^12s", []string{"effective_caller_id_number"}, true) if err != nil { t.Error("Unexpected error received", err) } - expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", + expctRatedCdr2 := &utils.RatedCDR{CgrId: utils.FSCgrId("01df56f4-d99a-4ef6-b7fe-b924b2415b7f"), AccId: "01df56f4-d99a-4ef6-b7fe-b924b2415b7f", CdrHost: "127.0.0.1", CdrSource: FS_CDR_SOURCE, ReqType: "postpaid", - Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963", - AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12)*time.Second, + Direction: "*in", Tenant: "cgrates.com", TOR: "premium_call", Account: "first_account", Subject: "first_subject", Destination: "+4986517174963", + AnswerTime: time.Date(2013, 12, 7, 8, 42, 26, 0, time.UTC), Duration: time.Duration(12) * time.Second, ExtraFields: map[string]string{"effective_caller_id_number": "+4986517174960"}, MediationRunId: "wholesale_run", Cost: -1} if !reflect.DeepEqual(rtCdrOut2, expctRatedCdr2) { t.Errorf("Received: %v, expected: %v", rtCdrOut2, expctRatedCdr2) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 74457c6bf..dae6cde74 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -43,7 +43,6 @@ import ( ) const ( - INTERNAL = "internal" JSON = "json" GOB = "gob" POSTGRES = "postgres" @@ -67,6 +66,7 @@ var ( exitChan = make(chan bool) server = &engine.Server{} scribeServer history.Scribe + cdrServer *cdrs.CDRS sm sessionmanager.SessionManager medi *mediator.Mediator cfg *config.CGRConfig @@ -89,7 +89,7 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage, cacheChan, chanDone chan struct{}) { var connector engine.Connector - if cfg.MediatorRater == INTERNAL { + if cfg.MediatorRater == utils.INTERNAL { <-cacheChan // Cache needs to come up before we are ready connector = responder } else { @@ -120,8 +120,11 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD close(chanDone) } -func startCdrc() { - cdrc, err := cdrc.NewCdrc(cfg) +func startCdrc(cdrsChan chan struct{}) { + if cfg.CdrcCdrs == utils.INTERNAL { + <-cdrsChan // Wait for CDRServer to come up before start processing + } + cdrc, err := cdrc.NewCdrc(cfg, cdrServer) if err != nil { engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true @@ -135,7 +138,7 @@ func startCdrc() { func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) { var connector engine.Connector - if cfg.SMRater == INTERNAL { + if cfg.SMRater == utils.INTERNAL { <-cacheChan // Wait for the cache to init before start doing queries connector = responder } else { @@ -165,13 +168,12 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage } default: engine.Logger.Err(fmt.Sprintf(" Unsupported session manger type: %s!", cfg.SMSwitchType)) - exitChan <- true } exitChan <- true } func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) { - if cfg.CDRSMediator == INTERNAL { + if cfg.CDRSMediator == utils.INTERNAL { <-mediChan // Deadlock if mediator not started if medi == nil { engine.Logger.Crit(" Could not connect to mediator, exiting.") @@ -179,8 +181,8 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d return } } - cs := cdrs.New(cdrDb, medi, cfg) - cs.RegisterHanlersToServer(server) + cdrServer = cdrs.New(cdrDb, medi, cfg) + cdrServer.RegisterHanlersToServer(server) close(doneChan) } @@ -196,7 +198,7 @@ func startHistoryServer(chanDone chan struct{}) { // chanStartServer will report when server is up, useful for internal requests func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struct{}) { - if cfg.HistoryServer == INTERNAL { // For internal requests, wait for server to come online before connecting + if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) <-chanServerStarted // If server is not enabled, will have deadlock here } else { // Connect in iteration since there are chances of concurrency here @@ -243,11 +245,11 @@ func checkConfigSanity() error { engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!") return errors.New("Improperly configured balancer") } - if cfg.CDRSEnabled && cfg.CDRSMediator == INTERNAL && !cfg.MediatorEnabled { + if cfg.CDRSEnabled && cfg.CDRSMediator == utils.INTERNAL && !cfg.MediatorEnabled { engine.Logger.Crit("CDRS cannot connect to mediator, Mediator not enabled in configuration!") return errors.New("Internal Mediator required by CDRS") } - if cfg.HistoryServerEnabled && cfg.HistoryServer == INTERNAL && !cfg.HistoryServerEnabled { + if cfg.HistoryServerEnabled && cfg.HistoryServer == utils.INTERNAL && !cfg.HistoryServerEnabled { engine.Logger.Crit("The history agent is enabled and internal and history server is disabled!") return errors.New("Improperly configured history service") } @@ -310,14 +312,16 @@ func main() { var logDb engine.LogStorage var loadDb engine.LoadStorage var cdrDb engine.CdrStorage - ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding) + ratingDb, err = engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, + cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return } defer ratingDb.Close() engine.SetRatingStorage(ratingDb) - accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding) + accountDb, err = engine.ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, + cfg.AccountDBName, cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure getter database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return @@ -328,7 +332,8 @@ func main() { if cfg.StorDBType == SAME { logDb = ratingDb.(engine.LogStorage) } else { - logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding) + logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, + cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding) if err != nil { // Cannot configure logger database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) return @@ -370,7 +375,7 @@ func main() { responder := &engine.Responder{ExitChan: exitChan} apier := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Config: cfg} - if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != INTERNAL { + if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != utils.INTERNAL { engine.Logger.Info("Registering CGRateS Rater service") server.RpcRegister(responder) server.RpcRegister(apier) @@ -423,9 +428,10 @@ func main() { go startMediator(responder, logDb, cdrDb, cacheChan, medChan) } + var cdrsChan chan struct{} if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDRS service.") - cdrsChan := make(chan struct{}) + cdrsChan = make(chan struct{}) httpWait = append(httpWait, cdrsChan) go startCDRS(responder, cdrDb, medChan, cdrsChan) } @@ -439,7 +445,7 @@ func main() { if cfg.CdrcEnabled { engine.Logger.Info("Starting CGRateS CDR client.") - go startCdrc() + go startCdrc(cdrsChan) } // Start the servers @@ -447,6 +453,7 @@ func main() { go serveHttp(httpWait) <-exitChan + if *pidFile != "" { if err := os.Remove(*pidFile); err != nil { engine.Logger.Warning("Could not remove pid file: " + err.Error()) diff --git a/config/config.go b/config/config.go index 8e6779467..99140023d 100644 --- a/config/config.go +++ b/config/config.go @@ -29,7 +29,6 @@ import ( const ( DISABLED = "disabled" - INTERNAL = "internal" JSON = "json" GOB = "gob" POSTGRES = "postgres" @@ -183,7 +182,7 @@ func (self *CGRConfig) setDefaults() error { self.CdreExtraFields = []string{} self.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv" self.CdrcEnabled = false - self.CdrcCdrs = "127.0.0.1:2080" + self.CdrcCdrs = utils.INTERNAL self.CdrcCdrsMethod = "http_cgr" self.CdrcRunDelay = time.Duration(0) self.CdrcCdrType = "csv" diff --git a/config/config_test.go b/config/config_test.go index 17b8f8ad6..ffb138667 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -84,7 +84,7 @@ func TestDefaults(t *testing.T) { eCfg.CdreExtraFields = []string{} eCfg.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv" eCfg.CdrcEnabled = false - eCfg.CdrcCdrs = "127.0.0.1:2080" + eCfg.CdrcCdrs = utils.INTERNAL eCfg.CdrcCdrsMethod = "http_cgr" eCfg.CdrcRunDelay = time.Duration(0) eCfg.CdrcCdrType = "csv" diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index c6ee15a81..7d19767df 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -56,7 +56,7 @@ [cdrc] # enabled = false # Enable CDR client functionality -# cdrs = 127.0.0.1:2080 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify # cdr_type = csv # CDR file format . diff --git a/engine/storage_map.go b/engine/storage_map.go index 78127a4f5..02378749b 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -22,10 +22,10 @@ import ( "errors" "fmt" - "strings" - "time" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" + "strings" + "time" ) type MapStorage struct { diff --git a/utils/consts.go b/utils/consts.go index 3a6135335..8cadcd6ef 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -64,6 +64,7 @@ const ( JSON = "json" MSGPACK = "msgpack" CSV_LOAD = "CSVLOAD" + CGRID = "cgrid" ACCID = "accid" CDRHOST = "cdrhost" CDRSOURCE = "cdrsource" @@ -80,6 +81,7 @@ const ( STATIC_VALUE_PREFIX = "^" CDRE_CSV = "csv" CDRE_DRYRUN = "dry_run" + INTERNAL = "internal" ) var ( diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 51798d95e..3aeb15d2f 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -20,6 +20,7 @@ package utils import ( "time" + "net/url" ) func NewRatedCDRFromRawCDR(rawcdr RawCDR) (*RatedCDR, error) { @@ -127,3 +128,24 @@ func (ratedCdr *RatedCDR) GetExtraFields() map[string]string { func (ratedCdr *RatedCDR) AsRatedCdr(runId, reqTypeFld, directionFld, tenantFld, torFld, accountFld, subjectFld, destFld, answerTimeFld, durationFld string, extraFlds []string, fieldsMandatory bool) (*RatedCDR, error) { return ratedCdr, nil } + +// Converts part of the rated Cdr as httpForm used to post remotely to CDRS +func (ratedCdr *RatedCDR) AsRawCdrHttpForm() url.Values { + v := url.Values{} + v.Set(ACCID, ratedCdr.AccId) + v.Set(CDRHOST, ratedCdr.CdrHost) + v.Set(CDRSOURCE, ratedCdr.CdrSource) + v.Set(REQTYPE, ratedCdr.ReqType) + v.Set(DIRECTION, ratedCdr.Direction) + v.Set(TENANT, ratedCdr.Tenant) + v.Set(TOR, ratedCdr.TOR) + v.Set(ACCOUNT, ratedCdr.Account) + v.Set(SUBJECT, ratedCdr.Subject) + v.Set(DESTINATION, ratedCdr.Destination) + v.Set(ANSWER_TIME, ratedCdr.AnswerTime.String()) + //v.Set(DURATION, string(ratedCdr.Duration.Seconds())) + for fld, val := range ratedCdr.ExtraFields { + v.Set(fld, val) + } + return v +} From 21b4c108360d343bde1917c36e4e231effa783ce Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 26 Jan 2014 09:45:51 +0100 Subject: [PATCH 8/9] Tests - cdrc/RecordAsRatedCdr ratedcdr/AsRawCdrHttpForm --- cdrc/cdrc.go | 32 +++------------------------ cdrc/cdrc_test.go | 46 ++++++++++++++++++++++++++++---------- utils/ratedcdr.go | 5 +++-- utils/ratedcdr_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 42 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 87510bfc0..6710dbd5f 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -26,7 +26,6 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "os" "path" "strconv" @@ -118,33 +117,8 @@ func (self *Cdrc) parseFieldsConfig() error { } // Takes the record out of csv and turns it into http form which can be posted -func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { - // engine.Logger.Info(fmt.Sprintf("Processing record %v", record)) - v := url.Values{} - v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId) - for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { - var fieldVal string - if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) { - fieldVal = cfgFieldVal[1:] - } else if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) { - if cfgFieldIdx, err := strconv.Atoi(cfgFieldVal); err != nil { // Should in theory never happen since we have already parsed config - return nil, err - } else if len(record) <= cfgFieldIdx { - return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cfgFieldName) - } else { - fieldVal = record[cfgFieldIdx] - } - } else { // Modify here when we add more supported cdr formats - fieldVal = "UNKNOWN" - } - v.Set(cfgFieldName, fieldVal) - } - return v, nil -} - -// Takes the record out of csv and turns it into http form which can be posted -func (self *Cdrc) cdrAsRatedCdr(record []string) (*utils.RatedCDR, error) { - ratedCdr := &utils.RatedCDR{CdrSource: self.cgrCfg.CdrcSourceId} +func (self *Cdrc) recordAsRatedCdr(record []string) (*utils.RatedCDR, error) { + ratedCdr := &utils.RatedCDR{CdrSource: self.cgrCfg.CdrcSourceId, ExtraFields: map[string]string{}, Cost: -1} var err error for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { var fieldVal string @@ -254,7 +228,7 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue // Other csv related errors, ignore } - rawCdr, err := self.cdrAsRatedCdr(record) + rawCdr, err := self.recordAsRatedCdr(record) if err != nil { engine.Logger.Err(fmt.Sprintf(" Error in csv file: %s", err.Error())) continue diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 2ed6a9271..4678b1a74 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -21,7 +21,9 @@ package cdrc import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "reflect" "testing" + "time" ) func TestParseFieldsConfig(t *testing.T) { @@ -54,29 +56,51 @@ func TestParseFieldsConfig(t *testing.T) { } } -func TestCdrAsHttpForm(t *testing.T) { +func TestRecordAsRatedCdr(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() + cgrConfig.CdrcExtraFields = []string{"supplier:10"} cdrc := &Cdrc{cgrCfg: cgrConfig} if err := cdrc.parseFieldsConfig(); err != nil { t.Error("Failed parsing default fieldIndexesFromConfig", err) } cdrRow := []string{"firstField", "secondField"} - _, err := cdrc.cdrAsHttpForm(cdrRow) + _, err := cdrc.recordAsRatedCdr(cdrRow) if err == nil { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} - cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow) + rtCdr, err := cdrc.recordAsRatedCdr(cdrRow) if err != nil { - t.Error("Failed to parse CDR in form", err) + t.Error("Failed to parse CDR in rated cdr", err) } - if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId { - t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE)) + expectedCdr := &utils.RatedCDR{ + CgrId: utils.FSCgrId(cdrRow[0]), + AccId: cdrRow[0], + CdrSource: cgrConfig.CdrcSourceId, + ReqType: cdrRow[1], + Direction: cdrRow[2], + Tenant: cdrRow[3], + TOR: cdrRow[4], + Account: cdrRow[5], + Subject: cdrRow[6], + Destination: cdrRow[7], + AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), + Duration: time.Duration(62) * time.Second, + ExtraFields: map[string]string{"supplier": "supplier1"}, + Cost: -1, } - if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { - t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + if !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } - //if cdrAsForm.Get("supplier") != "supplier1" { - // t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) - //} + /* + if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId { + t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE)) + } + if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { + t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) + } + if cdrAsForm.Get("supplier") != "supplier1" { + t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) + } + */ } diff --git a/utils/ratedcdr.go b/utils/ratedcdr.go index 3aeb15d2f..da4846d78 100644 --- a/utils/ratedcdr.go +++ b/utils/ratedcdr.go @@ -19,8 +19,9 @@ along with this program. If not, see package utils import ( - "time" "net/url" + "strconv" + "time" ) func NewRatedCDRFromRawCDR(rawcdr RawCDR) (*RatedCDR, error) { @@ -143,7 +144,7 @@ func (ratedCdr *RatedCDR) AsRawCdrHttpForm() url.Values { v.Set(SUBJECT, ratedCdr.Subject) v.Set(DESTINATION, ratedCdr.Destination) v.Set(ANSWER_TIME, ratedCdr.AnswerTime.String()) - //v.Set(DURATION, string(ratedCdr.Duration.Seconds())) + v.Set(DURATION, strconv.FormatFloat(ratedCdr.Duration.Seconds(), 'f', -1, 64)) for fld, val := range ratedCdr.ExtraFields { v.Set(fld, val) } diff --git a/utils/ratedcdr_test.go b/utils/ratedcdr_test.go index d3ff122e5..7802924d9 100644 --- a/utils/ratedcdr_test.go +++ b/utils/ratedcdr_test.go @@ -98,3 +98,53 @@ func TestRatedCdrFields(t *testing.T) { t.Error("Error parsing cdr: ", ratedCdr) } } + +func TestAsRawCdrHttpForm(t *testing.T) { + ratedCdr := RatedCDR{CgrId: FSCgrId("dsafdsaf"), AccId: "dsafdsaf", CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", + TOR: "call", Account: "1001", Subject: "1001", Destination: "1002", AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + Duration: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, + } + cdrForm := ratedCdr.AsRawCdrHttpForm() + if cdrForm.Get(ACCID) != ratedCdr.AccId { + t.Errorf("Expected: %s, received: %s", ratedCdr.AccId, cdrForm.Get(ACCID)) + } + if cdrForm.Get(CDRHOST) != ratedCdr.CdrHost { + t.Errorf("Expected: %s, received: %s", ratedCdr.CdrHost, cdrForm.Get(CDRHOST)) + } + if cdrForm.Get(CDRSOURCE) != ratedCdr.CdrSource { + t.Errorf("Expected: %s, received: %s", ratedCdr.CdrSource, cdrForm.Get(CDRSOURCE)) + } + if cdrForm.Get(REQTYPE) != ratedCdr.ReqType { + t.Errorf("Expected: %s, received: %s", ratedCdr.ReqType, cdrForm.Get(REQTYPE)) + } + if cdrForm.Get(DIRECTION) != ratedCdr.Direction { + t.Errorf("Expected: %s, received: %s", ratedCdr.Direction, cdrForm.Get(DIRECTION)) + } + if cdrForm.Get(TENANT) != ratedCdr.Tenant { + t.Errorf("Expected: %s, received: %s", ratedCdr.Tenant, cdrForm.Get(TENANT)) + } + if cdrForm.Get(TOR) != ratedCdr.TOR { + t.Errorf("Expected: %s, received: %s", ratedCdr.TOR, cdrForm.Get(TOR)) + } + if cdrForm.Get(ACCOUNT) != ratedCdr.Account { + t.Errorf("Expected: %s, received: %s", ratedCdr.Account, cdrForm.Get(ACCOUNT)) + } + if cdrForm.Get(SUBJECT) != ratedCdr.Subject { + t.Errorf("Expected: %s, received: %s", ratedCdr.Subject, cdrForm.Get(SUBJECT)) + } + if cdrForm.Get(DESTINATION) != ratedCdr.Destination { + t.Errorf("Expected: %s, received: %s", ratedCdr.Destination, cdrForm.Get(DESTINATION)) + } + if cdrForm.Get(ANSWER_TIME) != "2013-11-07 08:42:26 +0000 UTC" { + t.Errorf("Expected: %s, received: %s", "2013-11-07 08:42:26 +0000 UTC", cdrForm.Get(ANSWER_TIME)) + } + if cdrForm.Get(DURATION) != "10" { + t.Errorf("Expected: %s, received: %s", "10", cdrForm.Get(DURATION)) + } + if cdrForm.Get("field_extr1") != ratedCdr.ExtraFields["field_extr1"] { + t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["field_extr1"], cdrForm.Get("field_extr1")) + } + if cdrForm.Get("fieldextr2") != ratedCdr.ExtraFields["fieldextr2"] { + t.Errorf("Expected: %s, received: %s", ratedCdr.ExtraFields["fieldextr2"], cdrForm.Get("fieldextr2")) + } +} From 6a251c2b2c90716155c7bcc76238acb6ebb7c6c0 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 26 Jan 2014 10:33:02 +0100 Subject: [PATCH 9/9] Tutorials config updates for rc4 --- data/conf/cgrates.cfg | 2 +- .../fs_csv/cgrates/etc/cgrates/cgrates.cfg | 30 +++++----- .../cgrates/tariffplans/AccountActions.csv | 2 +- .../fs_json/cgrates/etc/cgrates/cgrates.cfg | 56 ++++++++++--------- .../cgrates/tariffplans/AccountActions.csv | 2 +- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 7d19767df..9b14678c7 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -39,7 +39,7 @@ [rater] # enabled = false # Enable RaterCDRSExportPath service: . -# balancer = # Register to Balancer as worker: <""|127.0.0.1:2013>. +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] # enabled = false # Starts Scheduler service: . diff --git a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg index 5ee293764..a3fe952ec 100644 --- a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg @@ -24,11 +24,13 @@ # stordb_user = cgrates # Username to use when connecting to stordb. # stordb_passwd = CGRateS.org # Password to use when connecting to stordb. # dbdata_encoding = msgpack # The encoding used to store object data in strings: -# rpc_encoding = json # RPC encoding used on APIs: . +# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address +# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address +# http_listen = 127.0.0.1:2080 # HTTP listening address # default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. +# default_tor = call # Default Type of Record to consider when missing from requests. +# default_tenant = cgrates.org # Default Tenant to consider when missing from requests. +# default_subject = cgrates # Default rating Subject to consider when missing from requests. # rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> # rounding_decimals = 4 # Number of decimals to round float/costs at @@ -38,15 +40,13 @@ [rater] enabled = true # Enable RaterCDRSExportPath service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] enabled = true # Starts Scheduler service: . [cdrs] enabled = true # Start the CDR Server service: . -# listen=127.0.0.1:2022 # CDRS's listening interface: . # extra_fields = # Extra fields to store in CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> @@ -57,7 +57,7 @@ export_dir = /tmp # Path where the exported CDRs will be placed [cdrc] enabled = true # Enable CDR client functionality -# cdrs = 127.0.0.1:2022 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify cdr_type = freeswitch_csv # CDR file format . @@ -78,8 +78,7 @@ extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv [mediator] enabled = true # Starts Mediator service: . -# listen=internal # Mediator's listening interface: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater: +# rater = internal # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. # run_ids = # Identifiers of each extra mediation to run on CDRs # reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. @@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: . [session_manager] enabled = true # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# rater = internal # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. # debit_interval = 5 # Interval to perform debits on. @@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: . [history_server] enabled = true # Starts History service: . -# listen = 127.0.0.1:2013 # Listening addres for history server: history_dir = /tmp/cgr_history # Location on disk where to store history files. # save_interval = 1s # Interval to save changed cache into .git archive [history_agent] # enabled = false # Starts History as a client: . -# server = 127.0.0.1:2013 # Address where to reach the master history server: +# server = internal # Address where to reach the master history server: + +[mailer] +# server = localhost # The server to use when sending emails out +# auth_user = cgrates # Authenticate to email server using this user +# auth_passwd = CGRateS.org # Authenticate to email server with this password +# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out diff --git a/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv b/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv index 455f5318c..e00c681bf 100644 --- a/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv +++ b/data/tutorials/fs_csv/cgrates/tariffplans/AccountActions.csv @@ -1,4 +1,4 @@ -#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag +#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS diff --git a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg index 1ac42f06e..a40598797 100644 --- a/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_json/cgrates/etc/cgrates/cgrates.cfg @@ -24,11 +24,13 @@ # stordb_user = cgrates # Username to use when connecting to stordb. # stordb_passwd = CGRateS.org # Password to use when connecting to stordb. # dbdata_encoding = msgpack # The encoding used to store object data in strings: -# rpc_encoding = json # RPC encoding used on APIs: . +# rpc_json_listen = 127.0.0.1:2012 # RPC JSON listening address +# rpc_gob_listen = 127.0.0.1:2013 # RPC GOB listening address +# http_listen = 127.0.0.1:2080 # HTTP listening address # default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. +# default_tor = call # Default Type of Record to consider when missing from requests. +# default_tenant = cgrates.org # Default Tenant to consider when missing from requests. +# default_subject = cgrates # Default rating Subject to consider when missing from requests. # rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> # rounding_decimals = 4 # Number of decimals to round float/costs at @@ -38,15 +40,13 @@ [rater] enabled = true # Enable RaterCDRSExportPath service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . +# balancer = # Register to Balancer as worker: <""|internal|127.0.0.1:2013>. [scheduler] enabled = true # Starts Scheduler service: . [cdrs] enabled = true # Start the CDR Server service: . -# listen=127.0.0.1:2022 # CDRS's listening interface: . # extra_fields = # Extra fields to store in CDRs mediator = internal # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> @@ -57,29 +57,28 @@ export_dir = /tmp # Path where the exported CDRs will be placed [cdrc] # enabled = false # Enable CDR client functionality -# cdrs = 127.0.0.1:2022 # Address where to reach CDR server +# cdrs = internal # Address where to reach CDR server. # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify -# cdr_type = freeswitch_csv # CDR file format . +# cdr_type = csv # CDR file format . # cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. -# cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved. +# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved. # cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. -# accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs. -# reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs. -# direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs. -# tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs. -# tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs. -# account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs. -# subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs. -# destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs. -# answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs. -# duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: : +# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. +# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. +# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs. +# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs. +# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs. +# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. +# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. +# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. +# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. +# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. +# extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] enabled = true # Starts Mediator service: . -# listen=internal # Mediator's listening interface: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater: +# rater = internal # Address where to reach the Rater: # rater_reconnects = 3 # Number of reconnects to rater before giving up. # run_ids = # Identifiers of each extra mediation to run on CDRs # reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs. @@ -95,7 +94,7 @@ enabled = true # Starts Mediator service: . [session_manager] enabled = true # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# rater = internal # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. # debit_interval = 5 # Interval to perform debits on. @@ -106,11 +105,16 @@ enabled = true # Starts SessionManager service: . [history_server] enabled = true # Starts History service: . -# listen = 127.0.0.1:2013 # Listening addres for history server: history_dir = /tmp/cgr_history # Location on disk where to store history files. # save_interval = 1s # Interval to save changed cache into .git archive [history_agent] # enabled = false # Starts History as a client: . -# server = 127.0.0.1:2013 # Address where to reach the master history server: +# server = internal # Address where to reach the master history server: + +[mailer] +# server = localhost # The server to use when sending emails out +# auth_user = cgrates # Authenticate to email server using this user +# auth_passwd = CGRateS.org # Authenticate to email server with this password +# from_address = cgr-mailer@localhost.localdomain # From address used when sending emails out diff --git a/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv b/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv index 455f5318c..e00c681bf 100644 --- a/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv +++ b/data/tutorials/fs_json/cgrates/tariffplans/AccountActions.csv @@ -1,4 +1,4 @@ -#Tenant,Account,Direction,ActionTimingsTag,ActionTriggersTag +#Tenant,Account,Direction,ActionPlanTag,ActionTriggersTag cgrates.org,1001,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1002,*out,PREPAID_10,STANDARD_TRIGGERS cgrates.org,1003,*out,PREPAID_10,STANDARD_TRIGGERS