integrated hstory into cgr-engine command

This commit is contained in:
Radu Ioan Fericean
2013-08-04 15:39:46 +03:00
parent 864b7eca5a
commit 88db48470d
8 changed files with 161 additions and 76 deletions

View File

@@ -201,34 +201,73 @@ func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) {
exitChan <- true
}
func startHistoryScribe() {
var scribeServer history.Scribe
flag.Parse()
if cfg.HistoryServerEnabled {
if scribeServer, err = history.NewFileScribe(cfg.HistoryPath); err != nil {
engine.Logger.Crit(err.Error())
exitChan <- true
return
}
if cfg.HistoryListen != INTERNAL {
rpc.RegisterName("Scribe", scribeServer)
var serveFunc func(io.ReadWriteCloser)
if cfg.RPCEncoding == JSON {
serveFunc = jsonrpc.ServeConn
} else {
serveFunc = rpc.ServeConn
}
l, err := net.Listen("tcp", cfg.HistoryListen)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<History> Could not listen to %v: %v", cfg.HistoryListen, err))
exitChan <- true
return
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
engine.Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
continue
}
engine.Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
go serveFunc(conn)
}
}
}
var scribeAgent history.Scribe
if cfg.HistoryAgentEnabled {
if cfg.HistoryServer != INTERNAL {
if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer); err != nil {
engine.Logger.Crit(err.Error())
exitChan <- true
return
}
} else {
scribeAgent = scribeServer
}
}
engine.SetHistoryScribe(scribeAgent)
return
}
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
engine.Logger.Crit("The balancer is enabled so it cannot connect to anatoher balancer (change [engine]/balancer to disabled)!")
engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change [engine]/balancer to disabled)!")
return errors.New("Improperly configured balancer")
}
return nil
}
func startHistoryScribe() (err error) {
var scribe history.Scribe
flag.Parse()
if cfg.HistoryMaster != "" {
scribe, err = history.NewProxyScribe(cfg.HistoryMaster)
} else {
scribe, err = history.NewFileScribe(cfg.HistoryRoot)
if cfg.HistoryServerEnabled && cfg.HistoryServer == INTERNAL && !cfg.HistoryServerEnabled {
engine.Logger.Crit("The history agent is enabled and internal and history server is disabled!")
return errors.New("Improperly configured history service")
}
rpc.RegisterName("Scribe", scribe)
rpc.HandleHTTP()
_, e := net.Listen("tcp", ":1234")
if e != nil {
return err
}
//http.Serve(l, nil)
return nil
}
@@ -328,7 +367,7 @@ func main() {
engine.Logger.Info("Starting CGRateS CDR Server.")
go startCDRS(responder, loggerDb)
}
if cfg.HistoryEnabled {
if cfg.HistoryServerEnabled || cfg.HistoryAgentEnabled {
engine.Logger.Info("Starting History Service.")
go startHistoryScribe()
}

View File

@@ -94,9 +94,11 @@ type CGRConfig struct {
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
HistoryEnabled bool // Starts History service: <true|false>.
HistoryMaster string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryRoot string // Location on disk where to store hostory files.
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryListen string // History server listening interface: <internal|x.y.z.y:1234>
HistoryPath string // Location on disk where to store history files.
}
func (self *CGRConfig) setDefaults() error {
@@ -155,9 +157,11 @@ func (self *CGRConfig) setDefaults() error {
self.FreeswitchServer = "127.0.0.1:8021"
self.FreeswitchPass = "ClueCon"
self.FreeswitchReconnects = 5
self.HistoryEnabled = false
self.HistoryMaster = "127.0.0.1:2013"
self.HistoryRoot = "/var/log/cgrates/history"
self.HistoryAgentEnabled = false
self.HistoryServerEnabled = false
self.HistoryServer = "127.0.0.1:2013"
self.HistoryListen = "127.0.0.1:2013"
self.HistoryPath = "/var/log/cgrates/history"
return nil
}
@@ -375,14 +379,20 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("freeswitch", "reconnects"); hasOpt {
cfg.FreeswitchReconnects, _ = c.GetInt("freeswitch", "reconnects")
}
if hasOpt = c.HasOption("history", "enabled"); hasOpt {
cfg.HistoryEnabled, _ = c.GetBool("history", "enabled")
if hasOpt = c.HasOption("history_agent", "enabled"); hasOpt {
cfg.HistoryAgentEnabled, _ = c.GetBool("history_agent", "enabled")
}
if hasOpt = c.HasOption("history", "master"); hasOpt {
cfg.HistoryMaster, _ = c.GetString("history", "master")
if hasOpt = c.HasOption("history_agent", "server"); hasOpt {
cfg.HistoryServer, _ = c.GetString("history_agent", "server")
}
if hasOpt = c.HasOption("history", "root"); hasOpt {
cfg.HistoryRoot, _ = c.GetString("history", "root")
if hasOpt = c.HasOption("history_server", "enabled"); hasOpt {
cfg.HistoryServerEnabled, _ = c.GetBool("history_server", "enabled")
}
if hasOpt = c.HasOption("history_server", "listen"); hasOpt {
cfg.HistoryListen, _ = c.GetString("history_server", "listen")
}
if hasOpt = c.HasOption("history_server", "path"); hasOpt {
cfg.HistoryPath, _ = c.GetString("history_server", "path")
}
return cfg, nil
}

View File

@@ -89,9 +89,11 @@ func TestDefaults(t *testing.T) {
eCfg.FreeswitchServer = "127.0.0.1:8021"
eCfg.FreeswitchPass = "ClueCon"
eCfg.FreeswitchReconnects = 5
eCfg.HistoryEnabled = false
eCfg.HistoryMaster = "127.0.0.1:2013"
eCfg.HistoryRoot = "/var/log/cgrates/history"
eCfg.HistoryAgentEnabled = false
eCfg.HistoryServer = "127.0.0.1:2013"
eCfg.HistoryServerEnabled = false
eCfg.HistoryListen = "127.0.0.1:2013"
eCfg.HistoryPath = "/var/log/cgrates/history"
if !reflect.DeepEqual(cfg, eCfg) {
t.Log(eCfg)
t.Log(cfg)
@@ -183,9 +185,11 @@ func TestConfigFromFile(t *testing.T) {
eCfg.FreeswitchServer = "test"
eCfg.FreeswitchPass = "test"
eCfg.FreeswitchReconnects = 99
eCfg.HistoryEnabled = true
eCfg.HistoryMaster = "test"
eCfg.HistoryRoot = "test"
eCfg.HistoryAgentEnabled = true
eCfg.HistoryServer = "test"
eCfg.HistoryServerEnabled = true
eCfg.HistoryListen = "test"
eCfg.HistoryPath = "test"
if !reflect.DeepEqual(cfg, eCfg) {
t.Log(eCfg)
t.Log(cfg)

View File

@@ -33,7 +33,7 @@ balancer = test # Register to Balancer as worker: <enabled|disabled>.
listen = test # Rater's listening interface: <internal|x.y.z.y:1234>.
[scheduler]
enabled = true # Starts Scheduler service: <true|false>.
enabled = true # Starts Scheduler service: <true|false>.
[cdrs]
listen=test # CDRS's listening interface: <x.y.z.y:1234>.
@@ -73,7 +73,11 @@ server = test # Adress where to connect to FreeSWITCH socket.
passwd = test # FreeSWITCH socket password.
reconnects = 99 # Number of attempts on connect failure.
[history]
[history_agent]
enabled = true # Starts History as a client: <true|false>.
server = test # Address where to reach the master history server: <internal|x.y.z.y:1234>
[history_server]
enabled = true # Starts History service: <true|false>.
master = test # Address where to reach the master history server: <internal|x.y.z.y:1234>
root = test # Location on disk where to store hostory files.
listen = test # Listening addres for history server: <internal|x.y.z.y:1234>
path = test # Location on disk where to store history files.

View File

@@ -5,44 +5,44 @@
# [global] must exist in all files, rest of the configuration is inter-changeable.
[global]
# datadb_type = redis # The main database: <redis>.
# datadb_host = 127.0.0.1 # Database host address.
# datadb_port = 6379 # Port to reach the database.
# datadb_type = redis # The main database: <redis>.
# datadb_host = 127.0.0.1 # Database host address.
# datadb_port = 6379 # Port to reach the database.
# datadb_name = 10 # The name of the database to connect to.
# datadb_user = # Username to use when connecting to database.
# datadb_passwd = # Password to use when connecting to database.
# stordb_type = mysql # Log/stored database type to use: <same|postgres|mongo|redis|mysql>
# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
# stordb_port = 3306 # The port to reach the logdb.
# stordb_name = cgrates # The name of the log database to connect to.
# stordb_user = cgrates # Username to use when connecting to stordb.
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
# stordb_type = mysql # Log/stored database type to use: <same|postgres|mongo|redis|mysql>
# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
# stordb_port = 3306 # The port to reach the logdb.
# stordb_name = cgrates # The name of the log database to connect to.
# stordb_user = cgrates # Username to use when connecting to stordb.
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
# default_tor = 0 # Default Type of Record to consider when missing from requests.
# default_tenant = 0 # Default Tenant to consider when missing from requests.
# default_subject = 0 # Default rating Subject to consider when missing from requests.
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
# default_tenant = 0 # Default Tenant to consider when missing from requests.
# default_subject = 0 # Default rating Subject to consider when missing from requests.
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
[balancer]
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
[rater]
# enabled = false # Enable Rater service: <true|false>.
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
[scheduler]
# enabled = false # Starts Scheduler service: <true|false>.
[cdrs]
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
# freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: <true|false>.
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# extra_fields = # Extra fields to store in CDRs
# freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: <true|false>.
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# extra_fields = # Extra fields to store in CDRs
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
@@ -54,9 +54,9 @@
# subject_fields = subject # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
@@ -64,7 +64,7 @@
# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
[session_manager]
# enabled = false # Starts SessionManager service: <true|false>.
# enabled = false # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
@@ -72,6 +72,14 @@
[freeswitch]
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
# passwd = ClueCon # FreeSWITCH socket password.
# reconnects = 5 # Number of attempts on connect failure.
# passwd = ClueCon # FreeSWITCH socket password.
# reconnects = 5 # Number of attempts on connect failure.
[history_agent]
#enabled = false # Starts History as a client: <true|false>.
#server = 127.0.0.1:2013 # Address where to reach the master history server: <internal|x.y.z.y:1234>
[history_server]
#enabled = false # Starts History service: <true|false>.
#listen = 127.0.0.1:2013 # Listening addres for history server: <internal|x.y.z.y:1234>
#path = /var/log/cgrates/history # Location on disk where to store history files.

View File

@@ -131,6 +131,9 @@ func (ms *MongoStorage) GetRatingProfile(key string) (rp *RatingProfile, err err
}
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
if historyScribe != nil {
go historyScribe.Record(RATING_PROFILE_PREFIX+rp.Id, rp)
}
return ms.db.C("ratingprofiles").Insert(rp)
}
@@ -144,6 +147,9 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err
}
func (ms *MongoStorage) SetDestination(dest *Destination) error {
if historyScribe != nil {
go historyScribe.Record(DESTINATION_PREFIX+dest.Id, dest)
}
return ms.db.C("destinations").Insert(dest)
}

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"github.com/cgrates/cgrates/utils"
"log"
"menteslibres.net/gosexy/redis"
"strconv"
"strings"
@@ -80,6 +81,9 @@ func (rs *RedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err err
func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) {
result, err := rs.ms.Marshal(rp)
_, err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result)
if err != nil && historyScribe != nil {
go historyScribe.Record(RATING_PROFILE_PREFIX+rp.Id, rp)
}
return
}
@@ -98,6 +102,11 @@ func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
return
}
_, err = rs.db.Set(DESTINATION_PREFIX+dest.Id, result)
log.Print("before")
if err != nil && historyScribe != nil {
log.Print("scribe: ", historyScribe)
go historyScribe.Record(DESTINATION_PREFIX+dest.Id, dest)
}
return
}

View File

@@ -75,6 +75,11 @@ func (s *FileScribe) Record(key string, obj interface{}) error {
}
func (s *FileScribe) gitInit() error {
if _, err := os.Stat(s.fileRoot); os.IsNotExist(err) {
if err := os.MkdirAll(s.fileRoot, os.ModeDir|0755); err != nil {
return errors.New("<History> Error creating history folder: " + err.Error())
}
}
if _, err := os.Stat(filepath.Join(s.fileRoot, ".git")); os.IsNotExist(err) {
cmd := exec.Command(s.gitCommand, "init")
cmd.Dir = s.fileRoot
@@ -82,16 +87,16 @@ func (s *FileScribe) gitInit() error {
return errors.New(string(out) + " " + err.Error())
}
if f, err := os.Create(filepath.Join(s.fileRoot, DESTINATIONS_FILE)); err != nil {
return err
return errors.New("<History> Error writing destinations file: " + err.Error())
} else {
f.Close()
}
if f, err := os.Create(filepath.Join(s.fileRoot, RATING_PROFILES_FILE)); err != nil {
return err
return errors.New("<History> Error writing rating profiles file: " + err.Error())
} else {
f.Close()
}
cmd = exec.Command(s.gitCommand, "add")
cmd = exec.Command(s.gitCommand, "add", ".")
cmd.Dir = s.fileRoot
if out, err := cmd.Output(); err != nil {
return errors.New(string(out) + " " + err.Error())
@@ -119,13 +124,13 @@ func (s *FileScribe) load(filename string) error {
switch filename {
case DESTINATIONS_FILE:
if err := d.Decode(&s.destinations); err != nil {
return err
if err := d.Decode(&s.destinations); err != nil && err != io.EOF {
return errors.New("<History> Error loading destinations: " + err.Error())
}
s.destinations.Sort()
case RATING_PROFILES_FILE:
if err := d.Decode(&s.ratingProfiles); err != nil {
return err
if err := d.Decode(&s.ratingProfiles); err != nil && err != io.EOF {
return errors.New("<History> Error loading rating profiles: " + err.Error())
}
s.ratingProfiles.Sort()
}