From 88db48470d5af4cbcda9de2dbcc0fba23c54dc0a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 4 Aug 2013 15:39:46 +0300 Subject: [PATCH] integrated hstory into cgr-engine command --- cmd/cgr-engine/cgr-engine.go | 79 +++++++++++++++++++++++++++--------- config/config.go | 34 ++++++++++------ config/config_test.go | 16 +++++--- config/test_data.txt | 12 ++++-- data/conf/cgrates.cfg | 62 ++++++++++++++++------------ engine/storage_mongo.go | 6 +++ engine/storage_redis.go | 9 ++++ history/file_scribe.go | 19 +++++---- 8 files changed, 161 insertions(+), 76 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 35152de5c..779398b4e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -201,34 +201,73 @@ func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) { exitChan <- true } +func startHistoryScribe() { + var scribeServer history.Scribe + flag.Parse() + + if cfg.HistoryServerEnabled { + if scribeServer, err = history.NewFileScribe(cfg.HistoryPath); err != nil { + engine.Logger.Crit(err.Error()) + exitChan <- true + return + } + if cfg.HistoryListen != INTERNAL { + rpc.RegisterName("Scribe", scribeServer) + var serveFunc func(io.ReadWriteCloser) + if cfg.RPCEncoding == JSON { + serveFunc = jsonrpc.ServeConn + } else { + serveFunc = rpc.ServeConn + } + l, err := net.Listen("tcp", cfg.HistoryListen) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not listen to %v: %v", cfg.HistoryListen, err)) + exitChan <- true + return + } + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Accept error: %v", conn)) + continue + } + + engine.Logger.Info(fmt.Sprintf(" New incoming connection: %v", conn.RemoteAddr())) + go serveFunc(conn) + } + } + } + var scribeAgent history.Scribe + + if cfg.HistoryAgentEnabled { + if cfg.HistoryServer != INTERNAL { + if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer); err != nil { + engine.Logger.Crit(err.Error()) + exitChan <- true + return + } + } else { + scribeAgent = scribeServer + } + } + engine.SetHistoryScribe(scribeAgent) + return +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED { engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!") return errors.New("SessionManager on Worker") } if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED { - engine.Logger.Crit("The balancer is enabled so it cannot connect to anatoher balancer (change [engine]/balancer to disabled)!") + engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change [engine]/balancer to disabled)!") return errors.New("Improperly configured balancer") } - - return nil -} - -func startHistoryScribe() (err error) { - var scribe history.Scribe - flag.Parse() - if cfg.HistoryMaster != "" { - scribe, err = history.NewProxyScribe(cfg.HistoryMaster) - } else { - scribe, err = history.NewFileScribe(cfg.HistoryRoot) + if cfg.HistoryServerEnabled && cfg.HistoryServer == INTERNAL && !cfg.HistoryServerEnabled { + engine.Logger.Crit("The history agent is enabled and internal and history server is disabled!") + return errors.New("Improperly configured history service") } - rpc.RegisterName("Scribe", scribe) - rpc.HandleHTTP() - _, e := net.Listen("tcp", ":1234") - if e != nil { - return err - } - //http.Serve(l, nil) return nil } @@ -328,7 +367,7 @@ func main() { engine.Logger.Info("Starting CGRateS CDR Server.") go startCDRS(responder, loggerDb) } - if cfg.HistoryEnabled { + if cfg.HistoryServerEnabled || cfg.HistoryAgentEnabled { engine.Logger.Info("Starting History Service.") go startHistoryScribe() } diff --git a/config/config.go b/config/config.go index e4489745c..49c227c92 100644 --- a/config/config.go +++ b/config/config.go @@ -94,9 +94,11 @@ type CGRConfig struct { FreeswitchServer string // freeswitch address host:port FreeswitchPass string // FS socket password FreeswitchReconnects int // number of times to attempt reconnect after connect fails - HistoryEnabled bool // Starts History service: . - HistoryMaster string // Address where to reach the master history server: - HistoryRoot string // Location on disk where to store hostory files. + HistoryAgentEnabled bool // Starts History as an agent: . + HistoryServerEnabled bool // Starts History as server: . + HistoryServer string // Address where to reach the master history server: + HistoryListen string // History server listening interface: + HistoryPath string // Location on disk where to store history files. } func (self *CGRConfig) setDefaults() error { @@ -155,9 +157,11 @@ func (self *CGRConfig) setDefaults() error { self.FreeswitchServer = "127.0.0.1:8021" self.FreeswitchPass = "ClueCon" self.FreeswitchReconnects = 5 - self.HistoryEnabled = false - self.HistoryMaster = "127.0.0.1:2013" - self.HistoryRoot = "/var/log/cgrates/history" + self.HistoryAgentEnabled = false + self.HistoryServerEnabled = false + self.HistoryServer = "127.0.0.1:2013" + self.HistoryListen = "127.0.0.1:2013" + self.HistoryPath = "/var/log/cgrates/history" return nil } @@ -375,14 +379,20 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("freeswitch", "reconnects"); hasOpt { cfg.FreeswitchReconnects, _ = c.GetInt("freeswitch", "reconnects") } - if hasOpt = c.HasOption("history", "enabled"); hasOpt { - cfg.HistoryEnabled, _ = c.GetBool("history", "enabled") + if hasOpt = c.HasOption("history_agent", "enabled"); hasOpt { + cfg.HistoryAgentEnabled, _ = c.GetBool("history_agent", "enabled") } - if hasOpt = c.HasOption("history", "master"); hasOpt { - cfg.HistoryMaster, _ = c.GetString("history", "master") + if hasOpt = c.HasOption("history_agent", "server"); hasOpt { + cfg.HistoryServer, _ = c.GetString("history_agent", "server") } - if hasOpt = c.HasOption("history", "root"); hasOpt { - cfg.HistoryRoot, _ = c.GetString("history", "root") + if hasOpt = c.HasOption("history_server", "enabled"); hasOpt { + cfg.HistoryServerEnabled, _ = c.GetBool("history_server", "enabled") + } + if hasOpt = c.HasOption("history_server", "listen"); hasOpt { + cfg.HistoryListen, _ = c.GetString("history_server", "listen") + } + if hasOpt = c.HasOption("history_server", "path"); hasOpt { + cfg.HistoryPath, _ = c.GetString("history_server", "path") } return cfg, nil } diff --git a/config/config_test.go b/config/config_test.go index db10d8fdf..1ab755a3e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -89,9 +89,11 @@ func TestDefaults(t *testing.T) { eCfg.FreeswitchServer = "127.0.0.1:8021" eCfg.FreeswitchPass = "ClueCon" eCfg.FreeswitchReconnects = 5 - eCfg.HistoryEnabled = false - eCfg.HistoryMaster = "127.0.0.1:2013" - eCfg.HistoryRoot = "/var/log/cgrates/history" + eCfg.HistoryAgentEnabled = false + eCfg.HistoryServer = "127.0.0.1:2013" + eCfg.HistoryServerEnabled = false + eCfg.HistoryListen = "127.0.0.1:2013" + eCfg.HistoryPath = "/var/log/cgrates/history" if !reflect.DeepEqual(cfg, eCfg) { t.Log(eCfg) t.Log(cfg) @@ -183,9 +185,11 @@ func TestConfigFromFile(t *testing.T) { eCfg.FreeswitchServer = "test" eCfg.FreeswitchPass = "test" eCfg.FreeswitchReconnects = 99 - eCfg.HistoryEnabled = true - eCfg.HistoryMaster = "test" - eCfg.HistoryRoot = "test" + eCfg.HistoryAgentEnabled = true + eCfg.HistoryServer = "test" + eCfg.HistoryServerEnabled = true + eCfg.HistoryListen = "test" + eCfg.HistoryPath = "test" if !reflect.DeepEqual(cfg, eCfg) { t.Log(eCfg) t.Log(cfg) diff --git a/config/test_data.txt b/config/test_data.txt index 0fe85c212..f7ce00870 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -33,7 +33,7 @@ balancer = test # Register to Balancer as worker: . listen = test # Rater's listening interface: . [scheduler] - enabled = true # Starts Scheduler service: . +enabled = true # Starts Scheduler service: . [cdrs] listen=test # CDRS's listening interface: . @@ -73,7 +73,11 @@ server = test # Adress where to connect to FreeSWITCH socket. passwd = test # FreeSWITCH socket password. reconnects = 99 # Number of attempts on connect failure. -[history] +[history_agent] +enabled = true # Starts History as a client: . +server = test # Address where to reach the master history server: + +[history_server] enabled = true # Starts History service: . -master = test # Address where to reach the master history server: -root = test # Location on disk where to store hostory files. \ No newline at end of file +listen = test # Listening addres for history server: +path = test # Location on disk where to store history files. \ No newline at end of file diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 72b70a6f1..447db28d8 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -5,44 +5,44 @@ # [global] must exist in all files, rest of the configuration is inter-changeable. [global] -# datadb_type = redis # The main database: . -# datadb_host = 127.0.0.1 # Database host address. -# datadb_port = 6379 # Port to reach the database. +# datadb_type = redis # The main database: . +# datadb_host = 127.0.0.1 # Database host address. +# datadb_port = 6379 # Port to reach the database. # datadb_name = 10 # The name of the database to connect to. # datadb_user = # Username to use when connecting to database. # datadb_passwd = # Password to use when connecting to database. -# stordb_type = mysql # Log/stored database type to use: -# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets. -# stordb_port = 3306 # The port to reach the logdb. -# stordb_name = cgrates # The name of the log database to connect to. -# stordb_user = cgrates # Username to use when connecting to stordb. -# stordb_passwd = CGRateS.org # Password to use when connecting to stordb. -# rpc_encoding = json # RPC encoding used on APIs: . -# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. +# stordb_type = mysql # Log/stored database type to use: +# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets. +# stordb_port = 3306 # The port to reach the logdb. +# stordb_name = cgrates # The name of the log database to connect to. +# stordb_user = cgrates # Username to use when connecting to stordb. +# stordb_passwd = CGRateS.org # Password to use when connecting to stordb. +# rpc_encoding = json # RPC encoding used on APIs: . +# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>. # default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. -# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> -# rounding_decimals = 4 # Number of decimals to round float/costs at +# default_tenant = 0 # Default Tenant to consider when missing from requests. +# default_subject = 0 # Default rating Subject to consider when missing from requests. +# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down> +# rounding_decimals = 4 # Number of decimals to round float/costs at [balancer] # enabled = false # Start Balancer service: . -# listen = 127.0.0.1:2012 # Balancer listen interface: . +# listen = 127.0.0.1:2012 # Balancer listen interface: . [rater] # enabled = false # Enable Rater service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . +# balancer = disabled # Register to Balancer as worker: . +# listen = 127.0.0.1:2012 # Rater's listening interface: . [scheduler] # enabled = false # Starts Scheduler service: . [cdrs] # listen=127.0.0.1:2022 # CDRS's listening interface: . -# freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: . -# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> -# extra_fields = # Extra fields to store in CDRs +# freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: . +# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> +# extra_fields = # Extra fields to store in CDRs [mediator] # enabled = false # Starts Mediator service: . @@ -54,9 +54,9 @@ # subject_fields = subject # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs. # reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs. # direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs. -# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. -# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. -# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. +# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs. +# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs. +# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs. # destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs. # time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs. # duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. @@ -64,7 +64,7 @@ # cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs). [session_manager] -# enabled = false # Starts SessionManager service: . +# enabled = false # Starts SessionManager service: . # switch_type = freeswitch # Defines the type of switch behind: . # rater = 127.0.0.1:2012 # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. @@ -72,6 +72,14 @@ [freeswitch] # server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. -# passwd = ClueCon # FreeSWITCH socket password. -# reconnects = 5 # Number of attempts on connect failure. +# passwd = ClueCon # FreeSWITCH socket password. +# reconnects = 5 # Number of attempts on connect failure. +[history_agent] +#enabled = false # Starts History as a client: . +#server = 127.0.0.1:2013 # Address where to reach the master history server: + +[history_server] +#enabled = false # Starts History service: . +#listen = 127.0.0.1:2013 # Listening addres for history server: +#path = /var/log/cgrates/history # Location on disk where to store history files. diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index a555c4744..340526ed1 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -131,6 +131,9 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err } func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { + if historyScribe != nil { + go historyScribe.Record(RATING_PROFILE_PREFIX+rp.Id, rp) + } return ms.db.C("ratingprofiles").Insert(rp) } @@ -144,6 +147,9 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err } func (ms *MongoStorage) SetDestination(dest *Destination) error { + if historyScribe != nil { + go historyScribe.Record(DESTINATION_PREFIX+dest.Id, dest) + } return ms.db.C("destinations").Insert(dest) } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 31080dd53..dfd6a88f0 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "github.com/cgrates/cgrates/utils" + "log" "menteslibres.net/gosexy/redis" "strconv" "strings" @@ -80,6 +81,9 @@ func (rs *RedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err err func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) { result, err := rs.ms.Marshal(rp) _, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) + if err != nil && historyScribe != nil { + go historyScribe.Record(RATING_PROFILE_PREFIX+rp.Id, rp) + } return } @@ -98,6 +102,11 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { return } _, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, result) + log.Print("before") + if err != nil && historyScribe != nil { + log.Print("scribe: ", historyScribe) + go historyScribe.Record(DESTINATION_PREFIX+dest.Id, dest) + } return } diff --git a/history/file_scribe.go b/history/file_scribe.go index dcc0fb52b..486de3833 100644 --- a/history/file_scribe.go +++ b/history/file_scribe.go @@ -75,6 +75,11 @@ func (s *FileScribe) Record(key string, obj interface{}) error { } func (s *FileScribe) gitInit() error { + if _, err := os.Stat(s.fileRoot); os.IsNotExist(err) { + if err := os.MkdirAll(s.fileRoot, os.ModeDir|0755); err != nil { + return errors.New(" Error creating history folder: " + err.Error()) + } + } if _, err := os.Stat(filepath.Join(s.fileRoot, ".git")); os.IsNotExist(err) { cmd := exec.Command(s.gitCommand, "init") cmd.Dir = s.fileRoot @@ -82,16 +87,16 @@ func (s *FileScribe) gitInit() error { return errors.New(string(out) + " " + err.Error()) } if f, err := os.Create(filepath.Join(s.fileRoot, DESTINATIONS_FILE)); err != nil { - return err + return errors.New(" Error writing destinations file: " + err.Error()) } else { f.Close() } if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PROFILES_FILE)); err != nil { - return err + return errors.New(" Error writing rating profiles file: " + err.Error()) } else { f.Close() } - cmd = exec.Command(s.gitCommand, "add") + cmd = exec.Command(s.gitCommand, "add", ".") cmd.Dir = s.fileRoot if out, err := cmd.Output(); err != nil { return errors.New(string(out) + " " + err.Error()) @@ -119,13 +124,13 @@ func (s *FileScribe) load(filename string) error { switch filename { case DESTINATIONS_FILE: - if err := d.Decode(&s.destinations); err != nil { - return err + if err := d.Decode(&s.destinations); err != nil && err != io.EOF { + return errors.New(" Error loading destinations: " + err.Error()) } s.destinations.Sort() case RATING_PROFILES_FILE: - if err := d.Decode(&s.ratingProfiles); err != nil { - return err + if err := d.Decode(&s.ratingProfiles); err != nil && err != io.EOF { + return errors.New(" Error loading rating profiles: " + err.Error()) } s.ratingProfiles.Sort() }