From 1f7f79def85e5a4cb233ef68a62ed59046c8f819 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 12 Apr 2013 19:01:09 +0200 Subject: [PATCH] Mediator modify to support db only rates, bug fixup on passing references to indexes, config modification with default indexes pointing to -1/db logs, mongodb as default logdb storage --- cmd/cgr-rater/cgr-rater.go | 18 +++--- config/config.go | 34 +++++----- data/conf/cgrates.cfg | 100 ++++++++++++++--------------- mediator/mediator.go | 18 +++--- sessionmanager/fssessionmanager.go | 5 +- sessionmanager/session.go | 8 ++- 6 files changed, 93 insertions(+), 90 deletions(-) diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index ea20bcf09..df7e9eab9 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -258,21 +258,23 @@ func main() { var getter, loggerDb rater.DataStorage getter, err = configureDatabase(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass) - - if err == nil { - defer getter.Close() - rater.SetDataStorage(getter) + if err != nil { // Cannot configure getter database, show stopper + rater.Logger.Crit(fmt.Sprintf("Could not configure database: %s exiting!", err)) + return } - + defer getter.Close() + rater.SetDataStorage(getter) if cfg.LogDBType == SAME { loggerDb = getter } else { loggerDb, err = configureDatabase(cfg.LogDBType, cfg.LogDBHost, cfg.LogDBPort, cfg.LogDBName, cfg.LogDBUser, cfg.LogDBPass) } - if err == nil { - defer loggerDb.Close() - rater.SetStorageLogger(loggerDb) + if err != nil { // Cannot configure logger database, show stopper + rater.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) + return } + defer loggerDb.Close() + rater.SetStorageLogger(loggerDb) if cfg.SMDebitInterval > 0 { if dp, err := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)); err == nil { diff --git a/config/config.go b/config/config.go index 4961f1c49..7ebe573e1 100644 --- a/config/config.go +++ b/config/config.go @@ -121,7 +121,7 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) { if hasOpt = c.HasOption("global", "datadb_passwd"); hasOpt { cfg.DataDBPass, _ = c.GetString("global", "datadb_passwd") } - cfg.LogDBType = MONGO + cfg.LogDBType = MONGO if hasOpt = c.HasOption("global", "logdb_type"); hasOpt { cfg.LogDBType, _ = c.GetString("global", "logdb_type") } @@ -137,11 +137,11 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) { if hasOpt = c.HasOption("global", "logdb_name"); hasOpt { cfg.LogDBName, _ = c.GetString("global", "logdb_name") } - cfg.LogDBUser = "cgrates" + cfg.LogDBUser = "" if hasOpt = c.HasOption("global", "logdb_user"); hasOpt { cfg.LogDBUser, _ = c.GetString("global", "logdb_user") } - cfg.LogDBPass = "CGRateS.org" + cfg.LogDBPass = "" if hasOpt = c.HasOption("global", "logdb_passwd"); hasOpt { cfg.LogDBPass, _ = c.GetString("global", "logdb_passwd") } @@ -181,11 +181,11 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) { if hasOpt = c.HasOption("mediator", "enabled"); hasOpt { cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled") } - cfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv/" + cfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv" if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt { cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir") } - cfg.MediatorCDROutDir = "/var/log/cgrates/cdr_out/" + cfg.MediatorCDROutDir = "/var/log/cgrates/cdr_out" if hasOpt = c.HasOption("mediator", "cdr_out_dir"); hasOpt { cfg.MediatorCDROutDir, _ = c.GetString("mediator", "cdr_out_dir") } @@ -257,42 +257,42 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) { if hasOpt = c.HasOption("freeswitch", "reconnects"); hasOpt { cfg.FreeswitchReconnects, _ = c.GetInt("freeswitch", "reconnects") } - cfg.FreeswitchTORIdx = "" + cfg.FreeswitchUUIDIdx = "10" + if hasOpt = c.HasOption("freeswitch", "uuid_index"); hasOpt { + cfg.FreeswitchUUIDIdx, _ = c.GetString("freeswitch", "uuid_index") + } + cfg.FreeswitchTORIdx = "-1" if hasOpt = c.HasOption("freeswitch", "tor_index"); hasOpt { cfg.FreeswitchTORIdx, _ = c.GetString("freeswitch", "tor_index") } - cfg.FreeswitchTenantIdx = "" + cfg.FreeswitchTenantIdx = "-1" if hasOpt = c.HasOption("freeswitch", "tenant_index"); hasOpt { cfg.FreeswitchTenantIdx, _ = c.GetString("freeswitch", "tenant_index") } - cfg.FreeswitchDirectionIdx = "" + cfg.FreeswitchDirectionIdx = "-1" if hasOpt = c.HasOption("freeswitch", "direction_index"); hasOpt { cfg.FreeswitchDirectionIdx, _ = c.GetString("freeswitch", "direction_index") } - cfg.FreeswitchSubjectIdx = "" + cfg.FreeswitchSubjectIdx = "-1" if hasOpt = c.HasOption("freeswitch", "subject_index"); hasOpt { cfg.FreeswitchSubjectIdx, _ = c.GetString("freeswitch", "subject_index") } - cfg.FreeswitchAccountIdx = "" + cfg.FreeswitchAccountIdx = "-1" if hasOpt = c.HasOption("freeswitch", "account_index"); hasOpt { cfg.FreeswitchAccountIdx, _ = c.GetString("freeswitch", "account_index") } - cfg.FreeswitchDestIdx = "" + cfg.FreeswitchDestIdx = "-1" if hasOpt = c.HasOption("freeswitch", "destination_index"); hasOpt { cfg.FreeswitchDestIdx, _ = c.GetString("freeswitch", "destination_index") } - cfg.FreeswitchTimeStartIdx = "" + cfg.FreeswitchTimeStartIdx = "-1" if hasOpt = c.HasOption("freeswitch", "time_start_index"); hasOpt { cfg.FreeswitchTimeStartIdx, _ = c.GetString("freeswitch", "time_start_index") } - cfg.FreeswitchDurationIdx = "" + cfg.FreeswitchDurationIdx = "-1" if hasOpt = c.HasOption("freeswitch", "duration_index"); hasOpt { cfg.FreeswitchDurationIdx, _ = c.GetString("freeswitch", "duration_index") } - cfg.FreeswitchUUIDIdx = "" - if hasOpt = c.HasOption("freeswitch", "uuid_index"); hasOpt { - cfg.FreeswitchUUIDIdx, _ = c.GetString("freeswitch", "uuid_index") - } return cfg, nil diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index fc1872d96..e9bb659aa 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -1,70 +1,70 @@ -# CGRateS Configuration fileRPC encoding used: . +# CGRateS Configuration file # # This file contains the default configuration hardcoded into CGRateS. # This is what you get when you load CGRateS with an empty configuration file. # [global] must exist in all files, rest of the configuration is inter-changeable. [global] -# datadb_type = redis # The main database: . -# datadb_host = 127.0.0.1 # Database host address. -# datadb_port = 6379 # Port to reach the database. -# datadb_name = 10 # The name of the database to connect to. -# datadb_user = # Username to use when connecting to database. -# datadb_passwd = # Password to use when connecting to database. -# logdb_type = mongo # Log/stored database type to use: -# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets. -# logdb_port = 27017 # The port to reach the logdb. -# logdb_name = cgrates # The name of the log database to connect to. -# logdb_user = cgrates # Username to use when connecting to logdb. -# logdb_passwd = CGRateS.org # Password to use when connecting to logdb. +# datadb_type = redis # The main database: . +# datadb_host = 127.0.0.1 # Database host address. +# datadb_port = 6379 # Port to reach the database. +# datadb_name = 10 # The name of the database to connect to. +# datadb_user = # Username to use when connecting to database. +# datadb_passwd = # Password to use when connecting to database. +# logdb_type = mongo # Log/stored database type to use: +# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets. +# logdb_port = 27017 # The port to reach the logdb. +# logdb_name = cgrates # The name of the log database to connect to. +# logdb_user = # Username to use when connecting to logdb. +# logdb_passwd = # Password to use when connecting to logdb. [balancer] -# enabled = false # Start Balancer service: . -# listen = 127.0.0.1:2012 # Balancer listen interface: . -# rpc_encoding = gob # RPC encoding used: . +# enabled = false # Start Balancer service: . +# listen = 127.0.0.1:2012 # Balancer listen interface: . +# rpc_encoding = gob # RPC encoding used: . [rater] -# enabled = false # Enable Rater service: . -# balancer = disabled # Register to Balancer as worker: . -# listen = 127.0.0.1:2012 # Rater's listening interface: . -# rpc_encoding = gob # RPC encoding used: . +# enabled = false # Enable Rater service: . +# balancer = disabled # Register to Balancer as worker: . +# listen = 127.0.0.1:2012 # Rater's listening interface: . +# rpc_encoding = gob # RPC encoding used: . [scheduler] -# enabled = false # Starts Scheduler service: . +# enabled = false # Starts Scheduler service: . [mediator] -# enabled = false # Starts Mediator service: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. -# rpc_encoding = gob # RPC encoding used when talking to Rater: . -# skipdb = false # Skips database checks for previous recorded prices: . -# pseudoprepaid = false # Execute debits together with pricing: . -# cdr_type = freeswitch_cdr # CDR type . -# cdr_in_dir = /var/log/freeswitch/cdr-csv/ # Absolute path towards the directory where the CDRs are kept. -# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported. +# enabled = false # Starts Mediator service: . +# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# rpc_encoding = gob # RPC encoding used when talking to Rater: . +# skipdb = false # Skips database checks for previous recorded prices: . +# pseudoprepaid = false # Execute debits together with pricing: . +# cdr_type = freeswitch_cdr # CDR type . +# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept. +# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported. [session_manager] -# enabled = false # Starts SessionManager service: . -# switch_type = freeswitch # Defines the type of switch behind: . -# rater = 127.0.0.1:2012 # Address where to reach the Rater. -# debit_interval = 5 # Interval to perform debits on. -# rpc_encoding = gob # RPC encoding used when talking to Rater: . -# default_reqtype = prepaid # Default request type to consider when missing from requests: <""|prepaid|postpaid>. -# default_tor = 0 # Default Type of Record to consider when missing from requests. -# default_tenant = 0 # Default Tenant to consider when missing from requests. -# default_subject = 0 # Default rating Subject to consider when missing from requests. +# enabled = false # Starts SessionManager service: . +# switch_type = freeswitch # Defines the type of switch behind: . +# rater = 127.0.0.1:2012 # Address where to reach the Rater. +# debit_interval = 5 # Interval to perform debits on. +# rpc_encoding = gob # RPC encoding used when talking to Rater: . +# default_reqtype = # Default request type to consider when missing from requests: <""|prepaid|postpaid>. +# default_tor = 0 # Default Type of Record to consider when missing from requests. +# default_tenant = 0 # Default Tenant to consider when missing from requests. +# default_subject = 0 # Default rating Subject to consider when missing from requests. [freeswitch] -# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. -# passwd = ClueCon # FreeSWITCH socket password. -# reconnects = 5 # Number of attempts on connect failure. -# uuid_index = # Index of the UUID info in the CDR file. -# direction_index = # Index of the CallDirection info in the CDR file. -# tor_index = # Index of the TypeOfRecord info in the CDR file. -# tenant_index = # Index of the Tenant info in the CDR file. -# subject_index = # Index of the Subject info in the CDR file. -# account_index = # Index of the Account info in the CDR file. -# destination_index = # Index of the Destination info in the CDR file. -# time_start_index = # Index of the TimeStart info in the CDR file. -# duration_index = # Index of the CallDuration info in the CDR file. +# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. +# passwd = ClueCon # FreeSWITCH socket password. +# reconnects = 5 # Number of attempts on connect failure. +# uuid_index = 10 # Index of the UUID info in the CDR file. +# direction_index = -1 # Index of the CallDirection info in the CDR file. +# tor_index = -1 # Index of the TypeOfRecord info in the CDR file. +# tenant_index = -1 # Index of the Tenant info in the CDR file. +# subject_index = -1 # Index of the Subject info in the CDR file. +# account_index = -1 # Index of the Account info in the CDR file. +# destination_index = -1 # Index of the Destination info in the CDR file. +# time_start_index = -1 # Index of the TimeStart info in the CDR file. +# duration_index = -1 # Index of the CallDuration info in the CDR file. diff --git a/mediator/mediator.go b/mediator/mediator.go index 72389a96d..f2925a11b 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -84,8 +84,8 @@ func NewMediator(connector rater.Connector, } idxs := []string{directionIndexs, torIndexs, tenantIndexs, subjectIndexs, accountIndexs, destinationIndexs, timeStartIndexs, durationIndexs, uuidIndexs} - objs := []mediatorFieldIdxs{m.directionIndexs, m.torIndexs, m.tenantIndexs, m.subjectIndexs, - m.accountIndexs, m.destinationIndexs, m.timeStartIndexs, m.durationIndexs, m.uuidIndexs} + objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs, + &m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs} for i, o := range objs { err = o.Load(idxs[i]) if err != nil { @@ -125,7 +125,7 @@ func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) { select { case ev := <-watcher.Event: if ev.Mask&inotify.IN_MOVED_TO != 0 { - rater.Logger.Info(fmt.Sprintf("Started to parse %v", ev.Name)) + rater.Logger.Info(fmt.Sprintf("Parsing: %v", ev.Name)) err = m.parseCSV(ev.Name) if err != nil { return err @@ -157,25 +157,27 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { defer fout.Close() w := bufio.NewWriter(fout) - for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { //t, _ := time.Parse("2012-05-21 17:48:20", record[5]) var cc *rater.CallCost - for runIdx := range m.subjectIndexs { // Query costs for every run index given by subject - if runIdx == 0 && !m.skipDb { // The first index is matching the session manager one + for runIdx,idxVal := range m.subjectIndexs { // Query costs for every run index given by subject + if idxVal == -1 { // -1 as subject means use database to get previous set price cc, err = m.getCostsFromDB(record, runIdx) if err != nil || cc == nil { // Fallback on rater if no db record found - cc, err = m.getCostsFromRater(record, runIdx) + rater.Logger.Err(fmt.Sprintf(" Error extracting price from database for uuid: <%s>, err: <%s>, cost: %v",record[m.uuidIndexs[runIdx]], err.Error(), cc)) + //cc, err = m.getCostsFromRater(record, runIdx) + continue } } else { cc, err = m.getCostsFromRater(record, runIdx) + } cost := "-1" if err != nil { rater.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record with uuid:%s and subject:%s - %s", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], err.Error())) } else { cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) - rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, subject:%s cost: %v", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], cost)) + rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, cost: %v", record[m.uuidIndexs[runIdx]], cost)) } record = append(record, cost) } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 694dad2ae..b36c73905 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -108,7 +108,6 @@ func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) { if err != nil { rater.Logger.Err("could not send disconect msg to freeswitch") } - s.Close() return } @@ -194,7 +193,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { if s == nil { // Not handled by us return } - sm.RemoveSession(s) // Session cleanup from memory + defer s.Close() // Stop loop and save the costs deducted so far to database if ev.GetReqType() == REQTYPE_POSTPAID { startTime, err := ev.GetStartTime(START_TIME) if err != nil { @@ -301,8 +300,6 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } lastCC.Cost -= cost rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds)) - s.SaveOperations() - s.Close() } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 482a74343..0dc7acde2 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -118,6 +118,7 @@ func (s *Session) Close() { } s.stopDebit <- true s.callDescriptor.TimeEnd = time.Now() + s.SaveOperations() s.sessionManager.RemoveSession(s) } @@ -136,9 +137,10 @@ func (s *Session) SaveOperations() { for _, cc := range s.CallCosts[1:] { firstCC.Merge(cc) } - if s.sessionManager.GetDbLogger() != nil { - s.sessionManager.GetDbLogger().LogCallCost(s.uuid, rater.SESSION_MANAGER_SOURCE, firstCC) + if s.sessionManager.GetDbLogger() == nil { + rater.Logger.Err(" Error: no connection to logger database, cannot save costs") } - rater.Logger.Debug(firstCC.String()) + s.sessionManager.GetDbLogger().LogCallCost(s.uuid, rater.SESSION_MANAGER_SOURCE, firstCC) + rater.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) }() }