From 71a932e343ff2e729d6e0cd32dd1ead30b5ee8ce Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Aug 2014 16:03:02 +0200 Subject: [PATCH] FS Event parsing cgr_category instead of cgr_tor, zero FS configuration support for passive billing scenarios --- cmd/cgr-engine/cgr-engine.go | 43 +++++++++++++++--------------- config/config.go | 10 +++---- config/config_test.go | 4 +-- config/test_data.txt | 9 ++++--- data/conf/cgrates.cfg | 5 ++-- sessionmanager/event.go | 1 + sessionmanager/fsevent.go | 42 ++++++++++++++++++++++++++++- sessionmanager/fsevent_test.go | 17 ++++++++++++ sessionmanager/fssessionmanager.go | 28 ++++++++++++------- 9 files changed, 112 insertions(+), 47 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 56117e59f..7f5423a5d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -141,7 +141,7 @@ func startCdrc(cdrsChan chan struct{}, cdrsAddress, cdrType, cdrInDir, cdrOutDir } func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) { - var raterConn engine.Connector + var raterConn, cdrsConn engine.Connector var client *rpcclient.RpcClient if cfg.SMRater == utils.INTERNAL { <-cacheChan // Wait for the cache to init before start doing queries @@ -161,31 +161,30 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage } raterConn = &engine.RPCClientConnector{Client: client} } + if cfg.SMCdrS == cfg.SMRater { + cdrsConn = raterConn + } else if cfg.SMCdrS == utils.INTERNAL { + <-cacheChan // Wait for the cache to init before start doing queries + cdrsConn = responder + } else { + for i := 0; i < cfg.SMReconnects; i++ { + client, err = rpcclient.NewRpcClient("tcp", cfg.SMCdrS, 0, cfg.SMReconnects, utils.GOB) + if err == nil { //Connected so no need to reiterate + break + } + time.Sleep(time.Duration(i+1) * time.Second) + } + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + } + cdrsConn = &engine.RPCClientConnector{Client: client} + } switch cfg.SMSwitchType { case FS: dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)) - sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, dp) + sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, raterConn, cdrsConn, dp) case OSIPS: - var cdrsConn engine.Connector - if cfg.OsipCDRS == cfg.SMRater { - cdrsConn = raterConn - } else if cfg.OsipCDRS == utils.INTERNAL { - <-cacheChan // Wait for the cache to init before start doing queries - cdrsConn = responder - } else { - for i := 0; i < cfg.OsipsReconnects; i++ { - client, err = rpcclient.NewRpcClient("tcp", cfg.OsipCDRS, 0, cfg.SMReconnects, utils.GOB) - if err == nil { //Connected so no need to reiterate - break - } - time.Sleep(time.Duration(i+1) * time.Second) - } - if err != nil { - engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) - exitChan <- true - } - cdrsConn = &engine.RPCClientConnector{Client: client} - } sm, _ = sessionmanager.NewOSipsSessionManager(cfg, raterConn, cdrsConn) default: engine.Logger.Err(fmt.Sprintf(" Unsupported session manger type: %s!", cfg.SMSwitchType)) diff --git a/config/config.go b/config/config.go index f1e6befc3..1d12ba650 100644 --- a/config/config.go +++ b/config/config.go @@ -108,6 +108,7 @@ type CGRConfig struct { SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer + SMCdrS string // SMReconnects int // Number of reconnect attempts to rater SMDebitInterval int // the period to be debited in advanced during a call (in seconds) SMMaxCallDuration time.Duration // The maximum duration of a call @@ -129,7 +130,6 @@ type CGRConfig struct { OsipsListenUdp string // Address where to listen for event datagrams coming from OpenSIPS OsipsMiAddr string // Adress where to reach OpenSIPS mi_datagram module OsipsEvSubscInterval time.Duration // Refresh event subscription at this interval - OsipCDRS string // Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013> OsipsReconnects int // Number of attempts on connect failure. HistoryAgentEnabled bool // Starts History as an agent: . HistoryServer string // Address where to reach the master history server: @@ -217,6 +217,7 @@ func (self *CGRConfig) setDefaults() error { self.SMEnabled = false self.SMSwitchType = FS self.SMRater = utils.INTERNAL + self.SMCdrS = "" self.SMReconnects = 3 self.SMDebitInterval = 10 self.SMMaxCallDuration = time.Duration(3) * time.Hour @@ -231,7 +232,6 @@ func (self *CGRConfig) setDefaults() error { self.OsipsListenUdp = "127.0.0.1:2020" self.OsipsMiAddr = "127.0.0.1:8020" self.OsipsEvSubscInterval = time.Duration(60) * time.Second - self.OsipCDRS = "internal" self.OsipsReconnects = 3 self.HistoryAgentEnabled = false self.HistoryServerEnabled = false @@ -554,6 +554,9 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("session_manager", "rater"); hasOpt { cfg.SMRater, _ = c.GetString("session_manager", "rater") } + if hasOpt = c.HasOption("session_manager", "cdrs"); hasOpt { + cfg.SMCdrS, _ = c.GetString("session_manager", "cdrs") + } if hasOpt = c.HasOption("session_manager", "reconnects"); hasOpt { cfg.SMReconnects, _ = c.GetInt("session_manager", "reconnects") } @@ -608,9 +611,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { return nil, err } } - if hasOpt = c.HasOption("opensips", "cdrs"); hasOpt { - cfg.OsipCDRS, _ = c.GetString("opensips", "cdrs") - } if hasOpt = c.HasOption("opensips", "reconnects"); hasOpt { cfg.OsipsReconnects, _ = c.GetInt("opensips", "reconnects") } diff --git a/config/config_test.go b/config/config_test.go index 4a0cfca0a..3a287ab29 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -117,6 +117,7 @@ func TestDefaults(t *testing.T) { eCfg.SMEnabled = false eCfg.SMSwitchType = FS eCfg.SMRater = utils.INTERNAL + eCfg.SMCdrS = "" eCfg.SMReconnects = 3 eCfg.SMDebitInterval = 10 eCfg.SMMinCallDuration = time.Duration(0) @@ -131,7 +132,6 @@ func TestDefaults(t *testing.T) { eCfg.OsipsListenUdp = "127.0.0.1:2020" eCfg.OsipsMiAddr = "127.0.0.1:8020" eCfg.OsipsEvSubscInterval = time.Duration(60) * time.Second - eCfg.OsipCDRS = "internal" eCfg.OsipsReconnects = 3 eCfg.DerivedChargers = make(utils.DerivedChargers, 0) eCfg.CombinedDerivedChargers = true @@ -274,6 +274,7 @@ func TestConfigFromFile(t *testing.T) { eCfg.SMEnabled = true eCfg.SMSwitchType = "test" eCfg.SMRater = "test" + eCfg.SMCdrS = "test" eCfg.SMReconnects = 99 eCfg.SMDebitInterval = 99 eCfg.SMMinCallDuration = time.Duration(98) * time.Second @@ -288,7 +289,6 @@ func TestConfigFromFile(t *testing.T) { eCfg.OsipsListenUdp = "test" eCfg.OsipsMiAddr = "test" eCfg.OsipsEvSubscInterval = time.Duration(99) * time.Second - eCfg.OsipCDRS = "test" eCfg.OsipsReconnects = 99 eCfg.DerivedChargers = utils.DerivedChargers{&utils.DerivedCharger{RunId: "test", RunFilters: "", ReqTypeField: "test", DirectionField: "test", TenantField: "test", CategoryField: "test", AccountField: "test", SubjectField: "test", DestinationField: "test", SetupTimeField: "test", AnswerTimeField: "test", UsageField: "test"}} diff --git a/config/test_data.txt b/config/test_data.txt index 9793ed00e..b405bcc9a 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -94,7 +94,7 @@ enabled = true # Start the CDR stats service: . queue_length = 99 # Number of items in the stats buffer time_window = 99 # Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow metrics = test # Stat metric ids to build -setup_interval = # Filter on CDR SetupTime +setup_interval = # Filter on CDR SetupTime tors = test # Filter on CDR TOR fields cdr_hosts= test # Filter on CDR CdrHost fields cdr_sources = test # Filter on CDR CdrSource fields @@ -105,16 +105,17 @@ categories = test # Filter on CDR Category fields accounts = test # Filter on CDR Account fields subjects = test # Filter on CDR Subject fields destination_prefixes = test # Filter on CDR Destination prefixes -usage_interval = 99 # Filter on CDR Usage -mediation_run_ids = test # Filter on CDR MediationRunId fields +usage_interval = 99 # Filter on CDR Usage +mediation_run_ids = test # Filter on CDR MediationRunId fields rated_accounts = test # Filter on CDR RatedAccount fields rated_subjects = test # Filter on CDR RatedSubject fields -cost_intervals = 99 # Filter on CDR Cost +cost_intervals = 99 # Filter on CDR Cost [session_manager] enabled = true # Starts SessionManager service: . switch_type = test # Defines the type of switch behind: . rater = test # Address where to reach the Rater. +cdrs = test # Address where to reach CDR Server, empty to disable CDR capturing <""|internal|127.0.0.1:2013> reconnects = 99 # Number of reconnects to rater before giving up. debit_interval = 99 # Interval to perform debits on. min_call_duration = 98 # Only authorize calls with allowed duration bigger than this diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 0cc68987c..ac074fb2f 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -52,7 +52,6 @@ # cdrstats = # Address where to reach the cdrstats service: # store_disable = false # When true, CDRs will not longer be saved in stordb, useful for cdrstats only scenario - [cdre] # cdr_format = csv # Exported CDRs format # data_usage_multiply_factor = 0.0 # Multiply data usage before export (eg: convert from KBytes to Bytes) @@ -121,7 +120,8 @@ [session_manager] # enabled = false # Starts SessionManager service: # switch_type = freeswitch # Defines the type of switch behind: -# rater = internal # Address where to reach the Rater +# rater = internal # Address where to reach the Rater <""|internal|127.0.0.1:2013> +# cdrs = # Address where to reach CDR Server, empty to disable CDR capturing <""|internal|127.0.0.1:2013> # reconnects = 3 # Number of reconnects to rater/cdrs before giving up. # debit_interval = 10 # Interval to perform debits on. # min_call_duration = 0s # Only authorize calls with allowed duration bigger than this @@ -140,7 +140,6 @@ # listen_udp = 127.0.0.1:2020 # Address where to listen for datagram events coming from OpenSIPS # mi_addr = 127.0.0.1:8020 # Adress where to reach OpenSIPS mi_datagram module # events_subscribe_interval = 60s # Automatic events subscription to OpenSIPS, 0 to disable it -# cdrs = internal # Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013> # reconnects = 3 # Number of attempts on connect failure. [derived_charging] diff --git a/sessionmanager/event.go b/sessionmanager/event.go index b7db11c4a..5e36d6be7 100644 --- a/sessionmanager/event.go +++ b/sessionmanager/event.go @@ -43,4 +43,5 @@ type Event interface { MissingParameter() bool ParseEventValue(*utils.RSRField) string PassesFieldFilter(*utils.RSRField) (bool, string) + AsStoredCdr() *utils.StoredCdr } diff --git a/sessionmanager/fsevent.go b/sessionmanager/fsevent.go index 71e413ce0..542aeda7f 100644 --- a/sessionmanager/fsevent.go +++ b/sessionmanager/fsevent.go @@ -41,7 +41,7 @@ const ( ACCOUNT = "variable_cgr_account" DESTINATION = "variable_cgr_destination" REQTYPE = "variable_cgr_reqtype" //prepaid or postpaid - Category = "variable_cgr_tor" + Category = "variable_cgr_category" UUID = "Unique-ID" // -Unique ID for this call leg CSTMID = "variable_cgr_tenant" CALL_DEST_NR = "Caller-Destination-Number" @@ -62,6 +62,7 @@ const ( SYSTEM_ERROR = "-SYSTEM_ERROR" MANAGER_REQUEST = "+MANAGER_REQUEST" USERNAME = "Caller-Username" + FS_IPv4 = "FreeSWITCH-IPv4" ) // Nice printing for the event object. @@ -205,6 +206,23 @@ func (fsev FSEvent) GetDuration(fieldName string) (dur time.Duration, err error) return utils.ParseDurationWithSecs(durStr) } +func (fsev FSEvent) GetOriginatorIP(fieldName string) string { + if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value + return fieldName[len(utils.STATIC_VALUE_PREFIX):] + } else if fieldName == utils.META_DEFAULT { + return fsev[FS_IPv4] + } + return utils.FirstNonEmpty(fsev[fieldName], fsev[FS_IPv4]) +} + +func (fsev FSEvent) GetExtraFields() map[string]string { + var extraFields map[string]string + for _, fldRule := range config.CgrConfig().CDRSExtraFields { + extraFields[fldRule.Id] = fsev.ParseEventValue(fldRule) + } + return extraFields +} + // Used in derived charging and sittuations when we need to run regexp on fields func (fsev FSEvent) ParseEventValue(rsrFld *utils.RSRField) string { switch rsrFld.Id { @@ -270,3 +288,25 @@ func (fsev FSEvent) PassesFieldFilter(fieldFilter *utils.RSRField) (bool, string } return false, "" } + +func (fsev FSEvent) AsStoredCdr() *utils.StoredCdr { + storCdr := new(utils.StoredCdr) + storCdr.CgrId = fsev.GetCgrId() + storCdr.TOR = utils.VOICE + storCdr.AccId = fsev.GetUUID() + storCdr.CdrHost = fsev.GetOriginatorIP(utils.META_DEFAULT) + storCdr.CdrSource = "FS_" + fsev.GetName() + storCdr.ReqType = fsev.GetReqType(utils.META_DEFAULT) + storCdr.Direction = fsev.GetDirection(utils.META_DEFAULT) + storCdr.Tenant = fsev.GetTenant(utils.META_DEFAULT) + storCdr.Category = fsev.GetCategory(utils.META_DEFAULT) + storCdr.Account = fsev.GetAccount(utils.META_DEFAULT) + storCdr.Subject = fsev.GetSubject(utils.META_DEFAULT) + storCdr.Destination = fsev.GetDestination(utils.META_DEFAULT) + storCdr.SetupTime, _ = fsev.GetSetupTime(utils.META_DEFAULT) + storCdr.AnswerTime, _ = fsev.GetAnswerTime(utils.META_DEFAULT) + storCdr.Usage, _ = fsev.GetDuration(utils.META_DEFAULT) + storCdr.ExtraFields = fsev.GetExtraFields() + storCdr.Cost = -1 + return storCdr +} diff --git a/sessionmanager/fsevent_test.go b/sessionmanager/fsevent_test.go index cec0dcc66..c6e82a5c8 100644 --- a/sessionmanager/fsevent_test.go +++ b/sessionmanager/fsevent_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package sessionmanager import ( + "reflect" "testing" "time" @@ -610,3 +611,19 @@ Caller-Username: 04021292812` t.Error("Should not pass filter") } } + +func TestFsEvAsStoredCdr(t *testing.T) { + cfg, _ = config.NewDefaultCGRConfig() + config.SetCgrConfig(cfg) + ev := new(FSEvent).New(hangupEv) + setupTime, _ := utils.ParseTimeDetectLayout("1398442107") + aTime, _ := utils.ParseTimeDetectLayout("1398442120") + eStoredCdr := &utils.StoredCdr{CgrId: utils.Sha1("37e9b766-5256-4e4b-b1ed-3767b930fec8", setupTime.UTC().String()), + TOR: utils.VOICE, AccId: "37e9b766-5256-4e4b-b1ed-3767b930fec8", CdrHost: "10.0.2.15", CdrSource: "FS_CHANNEL_HANGUP_COMPLETE", ReqType: utils.PSEUDOPREPAID, + Direction: utils.OUT, Tenant: "cgrates.org", Category: "call", Account: "1003", Subject: "1003", + Destination: "1002", SetupTime: setupTime, AnswerTime: aTime, + Usage: time.Duration(5) * time.Second, Cost: -1} + if storedCdr := ev.AsStoredCdr(); !reflect.DeepEqual(eStoredCdr, storedCdr) { + t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr) + } +} diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 8632068b0..21b2c656c 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -40,14 +40,16 @@ type FSSessionManager struct { conn net.Conn buf *bufio.Reader sessions []*Session - connector engine.Connector + rater engine.Connector + cdrs engine.Connector debitPeriod time.Duration loggerDB engine.LogStorage } -func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager { +func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, rater, cdrs engine.Connector, debitPeriod time.Duration) *FSSessionManager { + cfg = cgrCfg // make config global - return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod} + return &FSSessionManager{loggerDB: storage, rater: rater, cdrs: cdrs, debitPeriod: debitPeriod} } // Connects to the freeswitch mod_event_socket server and starts @@ -164,7 +166,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers - if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil { + if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { engine.Logger.Err(fmt.Sprintf(" OnPark: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(utils.META_DEFAULT), SYSTEM_ERROR) // We unpark on original destination return @@ -203,7 +205,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { TimeEnd: startTime.Add(cfg.SMMaxCallDuration), } var remainingDurationFloat float64 - err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) + err = sm.rater.GetMaxSessionTime(cd, &remainingDurationFloat) if err != nil { engine.Logger.Err(fmt.Sprintf("Could not get max session time for %s: %v", ev.GetUUID(), err)) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(""), SYSTEM_ERROR) // We unpark on original destination @@ -237,7 +239,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) { attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers - if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil { + if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { engine.Logger.Err(fmt.Sprintf(" OnAnswer: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) sm.DisconnectSession(ev.GetUUID(), SYSTEM_ERROR, "") // Disconnect the session since we are not able to process sessions return @@ -260,7 +262,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { attrsDC := utils.AttrDerivedChargers{Tenant: ev.GetTenant(utils.META_DEFAULT), Category: ev.GetCategory(utils.META_DEFAULT), Direction: ev.GetDirection(utils.META_DEFAULT), Account: ev.GetAccount(utils.META_DEFAULT), Subject: ev.GetSubject(utils.META_DEFAULT)} var dcs utils.DerivedChargers - if err := sm.connector.GetDerivedChargers(attrsDC, &dcs); err != nil { + if err := sm.rater.GetDerivedChargers(attrsDC, &dcs); err != nil { engine.Logger.Err(fmt.Sprintf(" OnHangup: could not get derived charging for event %s: %s", ev.GetUUID(), err.Error())) return } @@ -330,7 +332,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { Increments: refundIncrements, } var response float64 - err := sm.connector.RefundIncrements(*cd, &response) + err := sm.rater.RefundIncrements(*cd, &response) if err != nil { engine.Logger.Err(fmt.Sprintf("Debit cents failed: %v", err)) } @@ -338,7 +340,13 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { cost := refundIncrements.GetTotalCost() lastCC.Cost -= cost lastCC.Timespans.Compress() - // engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) + } + if sm.cdrs != nil { + var reply string + storedCdr := ev.AsStoredCdr() + if err := sm.cdrs.ProcessCdr(storedCdr, &reply); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error())) + } } } @@ -347,7 +355,7 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration { } func (sm *FSSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error { - return sm.connector.MaxDebit(*cd, cc) + return sm.rater.MaxDebit(*cd, cc) } func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {