mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Mediator modify to support db only rates, bug fixup on passing references to indexes, config modification with default indexes pointing to -1/db logs, mongodb as default logdb storage
This commit is contained in:
@@ -258,21 +258,23 @@ func main() {
|
||||
|
||||
var getter, loggerDb rater.DataStorage
|
||||
getter, err = configureDatabase(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass)
|
||||
|
||||
if err == nil {
|
||||
defer getter.Close()
|
||||
rater.SetDataStorage(getter)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
rater.Logger.Crit(fmt.Sprintf("Could not configure database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
|
||||
defer getter.Close()
|
||||
rater.SetDataStorage(getter)
|
||||
if cfg.LogDBType == SAME {
|
||||
loggerDb = getter
|
||||
} else {
|
||||
loggerDb, err = configureDatabase(cfg.LogDBType, cfg.LogDBHost, cfg.LogDBPort, cfg.LogDBName, cfg.LogDBUser, cfg.LogDBPass)
|
||||
}
|
||||
if err == nil {
|
||||
defer loggerDb.Close()
|
||||
rater.SetStorageLogger(loggerDb)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
rater.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer loggerDb.Close()
|
||||
rater.SetStorageLogger(loggerDb)
|
||||
|
||||
if cfg.SMDebitInterval > 0 {
|
||||
if dp, err := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)); err == nil {
|
||||
|
||||
@@ -121,7 +121,7 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("global", "datadb_passwd"); hasOpt {
|
||||
cfg.DataDBPass, _ = c.GetString("global", "datadb_passwd")
|
||||
}
|
||||
cfg.LogDBType = MONGO
|
||||
cfg.LogDBType = MONGO
|
||||
if hasOpt = c.HasOption("global", "logdb_type"); hasOpt {
|
||||
cfg.LogDBType, _ = c.GetString("global", "logdb_type")
|
||||
}
|
||||
@@ -137,11 +137,11 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("global", "logdb_name"); hasOpt {
|
||||
cfg.LogDBName, _ = c.GetString("global", "logdb_name")
|
||||
}
|
||||
cfg.LogDBUser = "cgrates"
|
||||
cfg.LogDBUser = ""
|
||||
if hasOpt = c.HasOption("global", "logdb_user"); hasOpt {
|
||||
cfg.LogDBUser, _ = c.GetString("global", "logdb_user")
|
||||
}
|
||||
cfg.LogDBPass = "CGRateS.org"
|
||||
cfg.LogDBPass = ""
|
||||
if hasOpt = c.HasOption("global", "logdb_passwd"); hasOpt {
|
||||
cfg.LogDBPass, _ = c.GetString("global", "logdb_passwd")
|
||||
}
|
||||
@@ -181,11 +181,11 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
|
||||
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
|
||||
}
|
||||
cfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv/"
|
||||
cfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
|
||||
if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt {
|
||||
cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir")
|
||||
}
|
||||
cfg.MediatorCDROutDir = "/var/log/cgrates/cdr_out/"
|
||||
cfg.MediatorCDROutDir = "/var/log/cgrates/cdr_out"
|
||||
if hasOpt = c.HasOption("mediator", "cdr_out_dir"); hasOpt {
|
||||
cfg.MediatorCDROutDir, _ = c.GetString("mediator", "cdr_out_dir")
|
||||
}
|
||||
@@ -257,42 +257,42 @@ func NewCGRConfig(cfgPath *string) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("freeswitch", "reconnects"); hasOpt {
|
||||
cfg.FreeswitchReconnects, _ = c.GetInt("freeswitch", "reconnects")
|
||||
}
|
||||
cfg.FreeswitchTORIdx = ""
|
||||
cfg.FreeswitchUUIDIdx = "10"
|
||||
if hasOpt = c.HasOption("freeswitch", "uuid_index"); hasOpt {
|
||||
cfg.FreeswitchUUIDIdx, _ = c.GetString("freeswitch", "uuid_index")
|
||||
}
|
||||
cfg.FreeswitchTORIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "tor_index"); hasOpt {
|
||||
cfg.FreeswitchTORIdx, _ = c.GetString("freeswitch", "tor_index")
|
||||
}
|
||||
cfg.FreeswitchTenantIdx = ""
|
||||
cfg.FreeswitchTenantIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "tenant_index"); hasOpt {
|
||||
cfg.FreeswitchTenantIdx, _ = c.GetString("freeswitch", "tenant_index")
|
||||
}
|
||||
cfg.FreeswitchDirectionIdx = ""
|
||||
cfg.FreeswitchDirectionIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "direction_index"); hasOpt {
|
||||
cfg.FreeswitchDirectionIdx, _ = c.GetString("freeswitch", "direction_index")
|
||||
}
|
||||
cfg.FreeswitchSubjectIdx = ""
|
||||
cfg.FreeswitchSubjectIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "subject_index"); hasOpt {
|
||||
cfg.FreeswitchSubjectIdx, _ = c.GetString("freeswitch", "subject_index")
|
||||
}
|
||||
cfg.FreeswitchAccountIdx = ""
|
||||
cfg.FreeswitchAccountIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "account_index"); hasOpt {
|
||||
cfg.FreeswitchAccountIdx, _ = c.GetString("freeswitch", "account_index")
|
||||
}
|
||||
cfg.FreeswitchDestIdx = ""
|
||||
cfg.FreeswitchDestIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "destination_index"); hasOpt {
|
||||
cfg.FreeswitchDestIdx, _ = c.GetString("freeswitch", "destination_index")
|
||||
}
|
||||
cfg.FreeswitchTimeStartIdx = ""
|
||||
cfg.FreeswitchTimeStartIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "time_start_index"); hasOpt {
|
||||
cfg.FreeswitchTimeStartIdx, _ = c.GetString("freeswitch", "time_start_index")
|
||||
}
|
||||
cfg.FreeswitchDurationIdx = ""
|
||||
cfg.FreeswitchDurationIdx = "-1"
|
||||
if hasOpt = c.HasOption("freeswitch", "duration_index"); hasOpt {
|
||||
cfg.FreeswitchDurationIdx, _ = c.GetString("freeswitch", "duration_index")
|
||||
}
|
||||
cfg.FreeswitchUUIDIdx = ""
|
||||
if hasOpt = c.HasOption("freeswitch", "uuid_index"); hasOpt {
|
||||
cfg.FreeswitchUUIDIdx, _ = c.GetString("freeswitch", "uuid_index")
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
|
||||
|
||||
@@ -1,70 +1,70 @@
|
||||
# CGRateS Configuration fileRPC encoding used: <gob|json>.
|
||||
# CGRateS Configuration file
|
||||
#
|
||||
# This file contains the default configuration hardcoded into CGRateS.
|
||||
# This is what you get when you load CGRateS with an empty configuration file.
|
||||
# [global] must exist in all files, rest of the configuration is inter-changeable.
|
||||
|
||||
[global]
|
||||
# datadb_type = redis # The main database: <redis>.
|
||||
# datadb_host = 127.0.0.1 # Database host address.
|
||||
# datadb_port = 6379 # Port to reach the database.
|
||||
# datadb_name = 10 # The name of the database to connect to.
|
||||
# datadb_user = # Username to use when connecting to database.
|
||||
# datadb_passwd = # Password to use when connecting to database.
|
||||
# logdb_type = mongo # Log/stored database type to use: <same|postgres|mongo|redis>
|
||||
# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
|
||||
# logdb_port = 27017 # The port to reach the logdb.
|
||||
# logdb_name = cgrates # The name of the log database to connect to.
|
||||
# logdb_user = cgrates # Username to use when connecting to logdb.
|
||||
# logdb_passwd = CGRateS.org # Password to use when connecting to logdb.
|
||||
# datadb_type = redis # The main database: <redis>.
|
||||
# datadb_host = 127.0.0.1 # Database host address.
|
||||
# datadb_port = 6379 # Port to reach the database.
|
||||
# datadb_name = 10 # The name of the database to connect to.
|
||||
# datadb_user = # Username to use when connecting to database.
|
||||
# datadb_passwd = # Password to use when connecting to database.
|
||||
# logdb_type = mongo # Log/stored database type to use: <same|postgres|mongo|redis>
|
||||
# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
|
||||
# logdb_port = 27017 # The port to reach the logdb.
|
||||
# logdb_name = cgrates # The name of the log database to connect to.
|
||||
# logdb_user = # Username to use when connecting to logdb.
|
||||
# logdb_passwd = # Password to use when connecting to logdb.
|
||||
|
||||
[balancer]
|
||||
# enabled = false # Start Balancer service: <true|false>.
|
||||
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
|
||||
# rpc_encoding = gob # RPC encoding used: <gob|json>.
|
||||
# enabled = false # Start Balancer service: <true|false>.
|
||||
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
|
||||
# rpc_encoding = gob # RPC encoding used: <gob|json>.
|
||||
|
||||
[rater]
|
||||
# enabled = false # Enable Rater service: <true|false>.
|
||||
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
|
||||
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
|
||||
# rpc_encoding = gob # RPC encoding used: <gob|json>.
|
||||
# enabled = false # Enable Rater service: <true|false>.
|
||||
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
|
||||
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
|
||||
# rpc_encoding = gob # RPC encoding used: <gob|json>.
|
||||
|
||||
[scheduler]
|
||||
# enabled = false # Starts Scheduler service: <true|false>.
|
||||
# enabled = false # Starts Scheduler service: <true|false>.
|
||||
|
||||
[mediator]
|
||||
# enabled = false # Starts Mediator service: <true|false>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
|
||||
# skipdb = false # Skips database checks for previous recorded prices: <true|false>.
|
||||
# pseudoprepaid = false # Execute debits together with pricing: <true|false>.
|
||||
# cdr_type = freeswitch_cdr # CDR type <freeswitch_cdr>.
|
||||
# cdr_in_dir = /var/log/freeswitch/cdr-csv/ # Absolute path towards the directory where the CDRs are kept.
|
||||
# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported.
|
||||
# enabled = false # Starts Mediator service: <true|false>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
|
||||
# skipdb = false # Skips database checks for previous recorded prices: <true|false>.
|
||||
# pseudoprepaid = false # Execute debits together with pricing: <true|false>.
|
||||
# cdr_type = freeswitch_cdr # CDR type <freeswitch_cdr>.
|
||||
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept.
|
||||
# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported.
|
||||
|
||||
[session_manager]
|
||||
# enabled = false # Starts SessionManager service: <true|false>.
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# debit_interval = 5 # Interval to perform debits on.
|
||||
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
|
||||
# default_reqtype = prepaid # Default request type to consider when missing from requests: <""|prepaid|postpaid>.
|
||||
# default_tor = 0 # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = 0 # Default Tenant to consider when missing from requests.
|
||||
# default_subject = 0 # Default rating Subject to consider when missing from requests.
|
||||
# enabled = false # Starts SessionManager service: <true|false>.
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
|
||||
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
|
||||
# debit_interval = 5 # Interval to perform debits on.
|
||||
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
|
||||
# default_reqtype = # Default request type to consider when missing from requests: <""|prepaid|postpaid>.
|
||||
# default_tor = 0 # Default Type of Record to consider when missing from requests.
|
||||
# default_tenant = 0 # Default Tenant to consider when missing from requests.
|
||||
# default_subject = 0 # Default rating Subject to consider when missing from requests.
|
||||
|
||||
[freeswitch]
|
||||
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
|
||||
# passwd = ClueCon # FreeSWITCH socket password.
|
||||
# reconnects = 5 # Number of attempts on connect failure.
|
||||
# uuid_index = # Index of the UUID info in the CDR file.
|
||||
# direction_index = # Index of the CallDirection info in the CDR file.
|
||||
# tor_index = # Index of the TypeOfRecord info in the CDR file.
|
||||
# tenant_index = # Index of the Tenant info in the CDR file.
|
||||
# subject_index = # Index of the Subject info in the CDR file.
|
||||
# account_index = # Index of the Account info in the CDR file.
|
||||
# destination_index = # Index of the Destination info in the CDR file.
|
||||
# time_start_index = # Index of the TimeStart info in the CDR file.
|
||||
# duration_index = # Index of the CallDuration info in the CDR file.
|
||||
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
|
||||
# passwd = ClueCon # FreeSWITCH socket password.
|
||||
# reconnects = 5 # Number of attempts on connect failure.
|
||||
# uuid_index = 10 # Index of the UUID info in the CDR file.
|
||||
# direction_index = -1 # Index of the CallDirection info in the CDR file.
|
||||
# tor_index = -1 # Index of the TypeOfRecord info in the CDR file.
|
||||
# tenant_index = -1 # Index of the Tenant info in the CDR file.
|
||||
# subject_index = -1 # Index of the Subject info in the CDR file.
|
||||
# account_index = -1 # Index of the Account info in the CDR file.
|
||||
# destination_index = -1 # Index of the Destination info in the CDR file.
|
||||
# time_start_index = -1 # Index of the TimeStart info in the CDR file.
|
||||
# duration_index = -1 # Index of the CallDuration info in the CDR file.
|
||||
|
||||
|
||||
|
||||
@@ -84,8 +84,8 @@ func NewMediator(connector rater.Connector,
|
||||
}
|
||||
idxs := []string{directionIndexs, torIndexs, tenantIndexs, subjectIndexs, accountIndexs,
|
||||
destinationIndexs, timeStartIndexs, durationIndexs, uuidIndexs}
|
||||
objs := []mediatorFieldIdxs{m.directionIndexs, m.torIndexs, m.tenantIndexs, m.subjectIndexs,
|
||||
m.accountIndexs, m.destinationIndexs, m.timeStartIndexs, m.durationIndexs, m.uuidIndexs}
|
||||
objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs,
|
||||
&m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs}
|
||||
for i, o := range objs {
|
||||
err = o.Load(idxs[i])
|
||||
if err != nil {
|
||||
@@ -125,7 +125,7 @@ func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) {
|
||||
select {
|
||||
case ev := <-watcher.Event:
|
||||
if ev.Mask&inotify.IN_MOVED_TO != 0 {
|
||||
rater.Logger.Info(fmt.Sprintf("Started to parse %v", ev.Name))
|
||||
rater.Logger.Info(fmt.Sprintf("Parsing: %v", ev.Name))
|
||||
err = m.parseCSV(ev.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -157,25 +157,27 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
|
||||
defer fout.Close()
|
||||
|
||||
w := bufio.NewWriter(fout)
|
||||
|
||||
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
|
||||
//t, _ := time.Parse("2012-05-21 17:48:20", record[5])
|
||||
var cc *rater.CallCost
|
||||
for runIdx := range m.subjectIndexs { // Query costs for every run index given by subject
|
||||
if runIdx == 0 && !m.skipDb { // The first index is matching the session manager one
|
||||
for runIdx,idxVal := range m.subjectIndexs { // Query costs for every run index given by subject
|
||||
if idxVal == -1 { // -1 as subject means use database to get previous set price
|
||||
cc, err = m.getCostsFromDB(record, runIdx)
|
||||
if err != nil || cc == nil { // Fallback on rater if no db record found
|
||||
cc, err = m.getCostsFromRater(record, runIdx)
|
||||
rater.Logger.Err(fmt.Sprintf("<Mediator> Error extracting price from database for uuid: <%s>, err: <%s>, cost: %v",record[m.uuidIndexs[runIdx]], err.Error(), cc))
|
||||
//cc, err = m.getCostsFromRater(record, runIdx)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
cc, err = m.getCostsFromRater(record, runIdx)
|
||||
|
||||
}
|
||||
cost := "-1"
|
||||
if err != nil {
|
||||
rater.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record with uuid:%s and subject:%s - %s", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], err.Error()))
|
||||
} else {
|
||||
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
|
||||
rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, subject:%s cost: %v", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], cost))
|
||||
rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, cost: %v", record[m.uuidIndexs[runIdx]], cost))
|
||||
}
|
||||
record = append(record, cost)
|
||||
}
|
||||
|
||||
@@ -108,7 +108,6 @@ func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) {
|
||||
if err != nil {
|
||||
rater.Logger.Err("could not send disconect msg to freeswitch")
|
||||
}
|
||||
s.Close()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -194,7 +193,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
|
||||
if s == nil { // Not handled by us
|
||||
return
|
||||
}
|
||||
sm.RemoveSession(s) // Session cleanup from memory
|
||||
defer s.Close() // Stop loop and save the costs deducted so far to database
|
||||
if ev.GetReqType() == REQTYPE_POSTPAID {
|
||||
startTime, err := ev.GetStartTime(START_TIME)
|
||||
if err != nil {
|
||||
@@ -301,8 +300,6 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
|
||||
}
|
||||
lastCC.Cost -= cost
|
||||
rater.Logger.Info(fmt.Sprintf("Rambursed %v cents, %v seconds", cost, seconds))
|
||||
s.SaveOperations()
|
||||
s.Close()
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -118,6 +118,7 @@ func (s *Session) Close() {
|
||||
}
|
||||
s.stopDebit <- true
|
||||
s.callDescriptor.TimeEnd = time.Now()
|
||||
s.SaveOperations()
|
||||
s.sessionManager.RemoveSession(s)
|
||||
}
|
||||
|
||||
@@ -136,9 +137,10 @@ func (s *Session) SaveOperations() {
|
||||
for _, cc := range s.CallCosts[1:] {
|
||||
firstCC.Merge(cc)
|
||||
}
|
||||
if s.sessionManager.GetDbLogger() != nil {
|
||||
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, rater.SESSION_MANAGER_SOURCE, firstCC)
|
||||
if s.sessionManager.GetDbLogger() == nil {
|
||||
rater.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
|
||||
}
|
||||
rater.Logger.Debug(firstCC.String())
|
||||
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, rater.SESSION_MANAGER_SOURCE, firstCC)
|
||||
rater.Logger.Debug(fmt.Sprintf("<SessionManager> End of call, having costs: %v", firstCC.String()))
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user