FS Event parsing cgr_category instead of cgr_tor, zero FS configuration support for passive billing scenarios

This commit is contained in:
DanB
2014-08-05 16:03:02 +02:00
parent 33e3f3fd66
commit 71a932e343
9 changed files with 112 additions and 47 deletions

View File

@@ -141,7 +141,7 @@ func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir
}
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn engine.Connector
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SMRater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
@@ -161,31 +161,30 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SMCdrS == cfg.SMRater {
cdrsConn = raterConn
} else if cfg.SMCdrS == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else {
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMCdrS, 0, cfg.SMReconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
switch cfg.SMSwitchType {
case FS:
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, dp)
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp)
case OSIPS:
var cdrsConn engine.Connector
if cfg.OsipCDRS == cfg.SMRater {
cdrsConn = raterConn
} else if cfg.OsipCDRS == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else {
for i := 0; i < cfg.OsipsReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.OsipCDRS, 0, cfg.SMReconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn)
default:
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))

View File

@@ -108,6 +108,7 @@ type CGRConfig struct {
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMCdrS string //
SMReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMMaxCallDuration time.Duration // The maximum duration of a call
@@ -129,7 +130,6 @@ type CGRConfig struct {
OsipsListenUdp string // Address where to listen for event datagrams coming from OpenSIPS
OsipsMiAddr string // Adress where to reach OpenSIPS mi_datagram module
OsipsEvSubscInterval time.Duration // Refresh event subscription at this interval
OsipCDRS string // Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013>
OsipsReconnects int // Number of attempts on connect failure.
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
@@ -217,6 +217,7 @@ func (self *CGRConfig) setDefaults() error {
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = utils.INTERNAL
self.SMCdrS = ""
self.SMReconnects = 3
self.SMDebitInterval = 10
self.SMMaxCallDuration = time.Duration(3) * time.Hour
@@ -231,7 +232,6 @@ func (self *CGRConfig) setDefaults() error {
self.OsipsListenUdp = "127.0.0.1:2020"
self.OsipsMiAddr = "127.0.0.1:8020"
self.OsipsEvSubscInterval = time.Duration(60) * time.Second
self.OsipCDRS = "internal"
self.OsipsReconnects = 3
self.HistoryAgentEnabled = false
self.HistoryServerEnabled = false
@@ -554,6 +554,9 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("session_manager", "rater"); hasOpt {
cfg.SMRater, _ = c.GetString("session_manager", "rater")
}
if hasOpt = c.HasOption("session_manager", "cdrs"); hasOpt {
cfg.SMCdrS, _ = c.GetString("session_manager", "cdrs")
}
if hasOpt = c.HasOption("session_manager", "reconnects"); hasOpt {
cfg.SMReconnects, _ = c.GetInt("session_manager", "reconnects")
}
@@ -608,9 +611,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, err
}
}
if hasOpt = c.HasOption("opensips", "cdrs"); hasOpt {
cfg.OsipCDRS, _ = c.GetString("opensips", "cdrs")
}
if hasOpt = c.HasOption("opensips", "reconnects"); hasOpt {
cfg.OsipsReconnects, _ = c.GetInt("opensips", "reconnects")
}

View File

@@ -117,6 +117,7 @@ func TestDefaults(t *testing.T) {
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS
eCfg.SMRater = utils.INTERNAL
eCfg.SMCdrS = ""
eCfg.SMReconnects = 3
eCfg.SMDebitInterval = 10
eCfg.SMMinCallDuration = time.Duration(0)
@@ -131,7 +132,6 @@ func TestDefaults(t *testing.T) {
eCfg.OsipsListenUdp = "127.0.0.1:2020"
eCfg.OsipsMiAddr = "127.0.0.1:8020"
eCfg.OsipsEvSubscInterval = time.Duration(60) * time.Second
eCfg.OsipCDRS = "internal"
eCfg.OsipsReconnects = 3
eCfg.DerivedChargers = make(utils.DerivedChargers, 0)
eCfg.CombinedDerivedChargers = true
@@ -274,6 +274,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.SMEnabled = true
eCfg.SMSwitchType = "test"
eCfg.SMRater = "test"
eCfg.SMCdrS = "test"
eCfg.SMReconnects = 99
eCfg.SMDebitInterval = 99
eCfg.SMMinCallDuration = time.Duration(98) * time.Second
@@ -288,7 +289,6 @@ func TestConfigFromFile(t *testing.T) {
eCfg.OsipsListenUdp = "test"
eCfg.OsipsMiAddr = "test"
eCfg.OsipsEvSubscInterval = time.Duration(99) * time.Second
eCfg.OsipCDRS = "test"
eCfg.OsipsReconnects = 99
eCfg.DerivedChargers = utils.DerivedChargers{&utils.DerivedCharger{RunId: "test", RunFilters: "", ReqTypeField: "test", DirectionField: "test", TenantField: "test",
CategoryField: "test", AccountField: "test", SubjectField: "test", DestinationField: "test", SetupTimeField: "test", AnswerTimeField: "test", UsageField: "test"}}

View File

@@ -94,7 +94,7 @@ enabled = true # Start the CDR stats service: <true|false>.
queue_length = 99 # Number of items in the stats buffer
time_window = 99 # Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
metrics = test # Stat metric ids to build
setup_interval = # Filter on CDR SetupTime
setup_interval = # Filter on CDR SetupTime
tors = test # Filter on CDR TOR fields
cdr_hosts= test # Filter on CDR CdrHost fields
cdr_sources = test # Filter on CDR CdrSource fields
@@ -105,16 +105,17 @@ categories = test # Filter on CDR Category fields
accounts = test # Filter on CDR Account fields
subjects = test # Filter on CDR Subject fields
destination_prefixes = test # Filter on CDR Destination prefixes
usage_interval = 99 # Filter on CDR Usage
mediation_run_ids = test # Filter on CDR MediationRunId fields
usage_interval = 99 # Filter on CDR Usage
mediation_run_ids = test # Filter on CDR MediationRunId fields
rated_accounts = test # Filter on CDR RatedAccount fields
rated_subjects = test # Filter on CDR RatedSubject fields
cost_intervals = 99 # Filter on CDR Cost
cost_intervals = 99 # Filter on CDR Cost
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.
switch_type = test # Defines the type of switch behind: <freeswitch>.
rater = test # Address where to reach the Rater.
cdrs = test # Address where to reach CDR Server, empty to disable CDR capturing <""|internal|127.0.0.1:2013>
reconnects = 99 # Number of reconnects to rater before giving up.
debit_interval = 99 # Interval to perform debits on.
min_call_duration = 98 # Only authorize calls with allowed duration bigger than this

View File

@@ -52,7 +52,6 @@
# cdrstats = # Address where to reach the cdrstats service: <internal|x.y.z.y:1234>
# store_disable = false # When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario
[cdre]
# cdr_format = csv # Exported CDRs format <csv>
# data_usage_multiply_factor = 0.0 # Multiply data usage before export (eg: convert from KBytes to Bytes)
@@ -121,7 +120,8 @@
[session_manager]
# enabled = false # Starts SessionManager service: <true|false>
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>
# rater = internal # Address where to reach the Rater
# rater = internal # Address where to reach the Rater <""|internal|127.0.0.1:2013>
# cdrs = # Address where to reach CDR Server, empty to disable CDR capturing <""|internal|127.0.0.1:2013>
# reconnects = 3 # Number of reconnects to rater/cdrs before giving up.
# debit_interval = 10 # Interval to perform debits on.
# min_call_duration = 0s # Only authorize calls with allowed duration bigger than this
@@ -140,7 +140,6 @@
# listen_udp = 127.0.0.1:2020 # Address where to listen for datagram events coming from OpenSIPS
# mi_addr = 127.0.0.1:8020 # Adress where to reach OpenSIPS mi_datagram module
# events_subscribe_interval = 60s # Automatic events subscription to OpenSIPS, 0 to disable it
# cdrs = internal # Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013>
# reconnects = 3 # Number of attempts on connect failure.
[derived_charging]

View File

@@ -43,4 +43,5 @@ type Event interface {
MissingParameter() bool
ParseEventValue(*utils.RSRField) string
PassesFieldFilter(*utils.RSRField) (bool, string)
AsStoredCdr() *utils.StoredCdr
}

View File

@@ -41,7 +41,7 @@ const (
ACCOUNT = "variable_cgr_account"
DESTINATION = "variable_cgr_destination"
REQTYPE = "variable_cgr_reqtype" //prepaid or postpaid
Category = "variable_cgr_tor"
Category = "variable_cgr_category"
UUID = "Unique-ID" // -Unique ID for this call leg
CSTMID = "variable_cgr_tenant"
CALL_DEST_NR = "Caller-Destination-Number"
@@ -62,6 +62,7 @@ const (
SYSTEM_ERROR = "-SYSTEM_ERROR"
MANAGER_REQUEST = "+MANAGER_REQUEST"
USERNAME = "Caller-Username"
FS_IPv4 = "FreeSWITCH-IPv4"
)
// Nice printing for the event object.
@@ -205,6 +206,23 @@ func (fsev FSEvent) GetDuration(fieldName string) (dur time.Duration, err error)
return utils.ParseDurationWithSecs(durStr)
}
func (fsev FSEvent) GetOriginatorIP(fieldName string) string {
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
} else if fieldName == utils.META_DEFAULT {
return fsev[FS_IPv4]
}
return utils.FirstNonEmpty(fsev[fieldName], fsev[FS_IPv4])
}
func (fsev FSEvent) GetExtraFields() map[string]string {
var extraFields map[string]string
for _, fldRule := range config.CgrConfig().CDRSExtraFields {
extraFields[fldRule.Id] = fsev.ParseEventValue(fldRule)
}
return extraFields
}
// Used in derived charging and sittuations when we need to run regexp on fields
func (fsev FSEvent) ParseEventValue(rsrFld *utils.RSRField) string {
switch rsrFld.Id {
@@ -270,3 +288,25 @@ func (fsev FSEvent) PassesFieldFilter(fieldFilter *utils.RSRField) (bool, string
}
return false, ""
}
func (fsev FSEvent) AsStoredCdr() *utils.StoredCdr {
storCdr := new(utils.StoredCdr)
storCdr.CgrId = fsev.GetCgrId()
storCdr.TOR = utils.VOICE
storCdr.AccId = fsev.GetUUID()
storCdr.CdrHost = fsev.GetOriginatorIP(utils.META_DEFAULT)
storCdr.CdrSource = "FS_" + fsev.GetName()
storCdr.ReqType = fsev.GetReqType(utils.META_DEFAULT)
storCdr.Direction = fsev.GetDirection(utils.META_DEFAULT)
storCdr.Tenant = fsev.GetTenant(utils.META_DEFAULT)
storCdr.Category = fsev.GetCategory(utils.META_DEFAULT)
storCdr.Account = fsev.GetAccount(utils.META_DEFAULT)
storCdr.Subject = fsev.GetSubject(utils.META_DEFAULT)
storCdr.Destination = fsev.GetDestination(utils.META_DEFAULT)
storCdr.SetupTime, _ = fsev.GetSetupTime(utils.META_DEFAULT)
storCdr.AnswerTime, _ = fsev.GetAnswerTime(utils.META_DEFAULT)
storCdr.Usage, _ = fsev.GetDuration(utils.META_DEFAULT)
storCdr.ExtraFields = fsev.GetExtraFields()
storCdr.Cost = -1
return storCdr
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"reflect"
"testing"
"time"
@@ -610,3 +611,19 @@ Caller-Username: 04021292812`
t.Error("Should not pass filter")
}
}
func TestFsEvAsStoredCdr(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
config.SetCgrConfig(cfg)
ev := new(FSEvent).New(hangupEv)
setupTime, _ := utils.ParseTimeDetectLayout("1398442107")
aTime, _ := utils.ParseTimeDetectLayout("1398442120")
eStoredCdr := &utils.StoredCdr{CgrId: utils.Sha1("37e9b766-5256-4e4b-b1ed-3767b930fec8", setupTime.UTC().String()),
TOR: utils.VOICE, AccId: "37e9b766-5256-4e4b-b1ed-3767b930fec8", CdrHost: "10.0.2.15", CdrSource: "FS_CHANNEL_HANGUP_COMPLETE", ReqType: utils.PSEUDOPREPAID,
Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003",
Destination: "1002", SetupTime: setupTime, AnswerTime: aTime,
Usage: time.Duration(5) * time.Second, Cost: -1}
if storedCdr := ev.AsStoredCdr(); !reflect.DeepEqual(eStoredCdr, storedCdr) {
t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr)
}
}

View File

@@ -40,14 +40,16 @@ type FSSessionManager struct {
conn net.Conn
buf *bufio.Reader
sessions []*Session
connector engine.Connector
rater engine.Connector
cdrs engine.Connector
debitPeriod time.Duration
loggerDB engine.LogStorage
}
func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager {
func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, rater, cdrs engine.Connector, debitPeriod time.Duration) *FSSessionManager {
cfg = cgrCfg // make config global
return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod}
return &FSSessionManager{loggerDB: storage, rater: rater, cdrs: cdrs, debitPeriod: debitPeriod}
}
// Connects to the freeswitch mod_event_socket server and starts
@@ -164,7 +166,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination
return
@@ -203,7 +205,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat)
err = sm.rater.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination
@@ -237,7 +239,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT),
Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR, "") // Disconnect the session since we are not able to process sessions
return
@@ -260,7 +262,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT),
Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)}
var dcs utils.DerivedChargers
if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error()))
return
}
@@ -330,7 +332,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
Increments: refundIncrements,
}
var response float64
err := sm.connector.RefundIncrements(*cd, &response)
err := sm.rater.RefundIncrements(*cd, &response)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err))
}
@@ -338,7 +340,13 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
lastCC.Timespans.Compress()
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
}
if sm.cdrs != nil {
var reply string
storedCdr := ev.AsStoredCdr()
if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
}
}
}
@@ -347,7 +355,7 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
}
func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
return sm.connector.MaxDebit(*cd, cc)
return sm.rater.MaxDebit(*cd, cc)
}
func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {