From f6758d0d0f7852efd1003c815d3af7da381c3c57 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 12 Jan 2014 19:34:56 +0200 Subject: [PATCH 01/10] More fixes for debit loop call duration --- engine/calldesc.go | 5 +++++ sessionmanager/fssessionmanager.go | 4 +--- sessionmanager/session.go | 12 +++++++++--- sessionmanager/sessionmanager.go | 5 +++-- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index ceca357bc..817b3ce86 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -368,6 +368,11 @@ func (cd *CallDescriptor) roundTimeSpansToIncrement(timespans TimeSpans) []*Time return timespans } +// Returns call descripor's total duration +func (cd *CallDescriptor) GetDuration() time.Duration { + return cd.TimeEnd.Sub(cd.TimeStart) +} + /* Creates a CallCost structure with the cost information calculated for the received CallDescriptor. */ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e6c3cfc70..f9b28d65d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -297,10 +297,8 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } -func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor, index float64) (cc *engine.CallCost) { +func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (cc *engine.CallCost) { cc = &engine.CallCost{} - cd.LoopIndex = index - cd.CallDuration += sm.debitPeriod err := sm.connector.MaxDebit(*cd, cc) if err != nil { engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err)) diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 2af31f6f8..40b57cc42 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -77,6 +77,7 @@ func NewSession(ev Event, sm SessionManager) (s *Session) { func (s *Session) startDebitLoop() { nextCd := *s.callDescriptor index := 0.0 + debitPeriod := s.sessionManager.GetDebitPeriod() for { select { case <-s.stopDebit: @@ -86,9 +87,14 @@ func (s *Session) startDebitLoop() { if index > 0 { // first time use the session start time nextCd.TimeStart = nextCd.TimeEnd } - nextCd.TimeEnd = nextCd.TimeStart.Add(s.sessionManager.GetDebitPeriod()) - cc := s.sessionManager.LoopAction(s, &nextCd, index) - nextCd.TimeEnd = cc.GetEndTime() + nextCd.TimeEnd = nextCd.TimeStart.Add(debitPeriod) + nextCd.LoopIndex = index + nextCd.CallDuration += debitPeriod // first presumed duration + cc := s.sessionManager.LoopAction(s, &nextCd) + nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd + // update call duration with real debited duration + nextCd.CallDuration -= debitPeriod + nextCd.CallDuration += nextCd.GetDuration() time.Sleep(cc.GetDuration()) index++ } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 9eb2f68ae..ea0074786 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -19,16 +19,17 @@ along with this program. If not, see package sessionmanager import ( + "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "time" ) type SessionManager interface { Connect(*config.CGRConfig) error DisconnectSession(*Session, string) RemoveSession(*Session) - LoopAction(*Session, *engine.CallDescriptor, float64) *engine.CallCost + LoopAction(*Session, *engine.CallDescriptor) *engine.CallCost GetDebitPeriod() time.Duration GetDbLogger() engine.LogStorage Shutdown() error From 8a323544e8d223d9e3650f902a8525ea6a983125 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jan 2014 18:37:18 +0100 Subject: [PATCH 02/10] Adding SMCallMaxDuration setting --- config/config.go | 8 +++++ config/config_test.go | 2 ++ config/test_data.txt | 5 +-- data/conf/cgrates.cfg | 3 +- .../mysql/create_costdetails_tables.sql | 2 +- .../etc/freeswitch/dialplan/default.xml | 6 ++-- docs/tut_freeswitch_csv.rst | 30 +++++++++++++++--- engine/storage_sql.go | 8 ++--- sessionmanager/fssessionmanager.go | 31 +++++++++++++------ 9 files changed, 70 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index 422906a6f..9920290e4 100644 --- a/config/config.go +++ b/config/config.go @@ -116,6 +116,7 @@ type CGRConfig struct { SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer SMRaterReconnects int // Number of reconnect attempts to rater SMDebitInterval int // the period to be debited in advanced during a call (in seconds) + SMMaxCallDuration time.Duration // The maximum duration of a call MediatorEnabled bool // Starts Mediator service: . MediatorListen string // Mediator's listening interface: . MediatorRater string // Address where to reach the Rater: @@ -219,6 +220,7 @@ func (self *CGRConfig) setDefaults() error { self.SMRater = "127.0.0.1:2012" self.SMRaterReconnects = 3 self.SMDebitInterval = 10 + self.SMMaxCallDuration = time.Duration(3) * time.Hour self.FreeswitchServer = "127.0.0.1:8021" self.FreeswitchPass = "ClueCon" self.FreeswitchReconnects = 5 @@ -519,6 +521,12 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) { if hasOpt = c.HasOption("session_manager", "debit_interval"); hasOpt { cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval") } + if hasOpt = c.HasOption("session_manager", "max_call_duration"); hasOpt { + maxCallDurStr,_ := c.GetString("session_manager", "max_call_duration") + if cfg.SMMaxCallDuration, errParse = utils.ParseDurationWithSecs(maxCallDurStr); errParse != nil { + return nil, errParse + } + } if hasOpt = c.HasOption("freeswitch", "server"); hasOpt { cfg.FreeswitchServer, _ = c.GetString("freeswitch", "server") } diff --git a/config/config_test.go b/config/config_test.go index 99ddbf056..5dd1d6f38 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -123,6 +123,7 @@ func TestDefaults(t *testing.T) { eCfg.SMRater = "127.0.0.1:2012" eCfg.SMRaterReconnects = 3 eCfg.SMDebitInterval = 10 + eCfg.SMMaxCallDuration = time.Time(3) * time.Hour eCfg.FreeswitchServer = "127.0.0.1:8021" eCfg.FreeswitchPass = "ClueCon" eCfg.FreeswitchReconnects = 5 @@ -245,6 +246,7 @@ func TestConfigFromFile(t *testing.T) { eCfg.SMRater = "test" eCfg.SMRaterReconnects = 99 eCfg.SMDebitInterval = 99 + eCfg.SMMaxCallDuration = "test" eCfg.FreeswitchServer = "test" eCfg.FreeswitchPass = "test" eCfg.FreeswitchReconnects = 99 diff --git a/config/test_data.txt b/config/test_data.txt index 2636b4474..f3372761a 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -94,8 +94,9 @@ duration_fields = test # Name of duration fields to be used during mediation. enabled = true # Starts SessionManager service: . switch_type = test # Defines the type of switch behind: . rater = test # Address where to reach the Rater. -rater_reconnects = 99 # Number of reconnects to rater before giving up. -debit_interval = 99 # Interval to perform debits on. +rater_reconnects = 99 # Number of reconnects to rater before giving up. +debit_interval = 99 # Interval to perform debits on. +max_call_duration = test # Maximum call duration a prepaid call can last [freeswitch] server = test # Adress where to connect to FreeSWITCH socket. diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index d2724292f..1af1f445f 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -97,7 +97,8 @@ # switch_type = freeswitch # Defines the type of switch behind: . # rater = 127.0.0.1:2012 # Address where to reach the Rater. # rater_reconnects = 3 # Number of reconnects to rater before giving up. -# debit_interval = 5 # Interval to perform debits on. +# debit_interval = 10 # Interval to perform debits on. +# max_call_duration = 3h # Maximum call duration a prepaid call can last [freeswitch] # server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket. diff --git a/data/storage/mysql/create_costdetails_tables.sql b/data/storage/mysql/create_costdetails_tables.sql index d55cb7a34..774bd27cc 100644 --- a/data/storage/mysql/create_costdetails_tables.sql +++ b/data/storage/mysql/create_costdetails_tables.sql @@ -14,8 +14,8 @@ CREATE TABLE `cost_details` ( `account` varchar(128) NOT NULL, `subject` varchar(128) NOT NULL, `destination` varchar(128) NOT NULL, - `cost` DECIMAL(20,4) NOT NULL, `connect_fee` DECIMAL(5,4) NOT NULL, + `cost` DECIMAL(20,4) NOT NULL, `timespans` text, `source` varchar(64) NOT NULL, `runid` varchar(64) NOT NULL, diff --git a/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml b/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml index 8bc0ea41b..cfdc0a9e4 100644 --- a/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml +++ b/data/tutorials/fs_csv/freeswitch/etc/freeswitch/dialplan/default.xml @@ -254,7 +254,7 @@ --> - + @@ -262,7 +262,7 @@ - + @@ -272,7 +272,7 @@ - + diff --git a/docs/tut_freeswitch_csv.rst b/docs/tut_freeswitch_csv.rst index 70b57caed..654e6e4a0 100644 --- a/docs/tut_freeswitch_csv.rst +++ b/docs/tut_freeswitch_csv.rst @@ -13,10 +13,10 @@ Scenario - **CGRateS** with following components: - CGR-SM started as prepaid controller, with debits taking place at 5s intervals. - - CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR (moving the processed *.csv* files to */tmp* folder). - - CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC. - - CGR-CDRE exporting mediated CDRs (export path: */tmp*). - - CGR-History component keeping the archive of the rates modifications (path: */tmp/cgr_history*). + - CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR and moving the processed *.csv* files to */tmp* folder. + - CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC inside CGR StorDB. + - CGR-CDRE exporting mediated CDRs from CGR StorDB (export path: */tmp*). + - CGR-History component keeping the archive of the rates modifications (path browsable with git client at */tmp/cgr_history*). Starting FreeSWITCH_ with custom configuration @@ -103,14 +103,34 @@ To verify that all actions successfully performed, we use following *cgr-console Test calls ---------- -Calling between 1001 and 1003 should generate prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. The difference between calling from 1001 or 1003 should be reflected in fact that 1001 will generate real-time debits as opposite to 1003 which will only generate debits when CDRs will be processed. + +1001 -> 1002 +~~~~~~~~~~~~ + +Since the user 1001 is marked as prepaid inside FreeSWITCH_ directory configuration, calling between 1001 and 1002 should generate prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. As per our tariff plans, we should get first 60s charged as a whole, then in intervals of 1s (configured SessionManager debit interval of 10s). + +*Note*: An important particularity to note here is the ability of **CGRateS** SessionManager to refund units booked in advance (eg: if debit occurs every 10s and rate increments are set to 1s, the SessionManager will be smart enough to refund pre-booked credits for calls stoped in the middle of debit interval). + +Check that 1001 balance is properly debitted, during the call: :: cgr-console get_balance cgrates.org 1001 + + +1002 -> 1001 +~~~~~~~~~~~~ + +The user 1002 is marked as postpaid inside FreeSWITCH_ hence his calls will be debited at the end of the call instead of during a call and his balance will be able to go on negative without influencing his calls. + +To check that we had debits we use again console command, this time not during the call but at the end of it: + +:: + cgr-console get_balance cgrates.org 1002 + CDR processing -------------- diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 539c41a29..0f4e34b78 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -646,7 +646,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')", + _, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')", utils.TBL_COST_DETAILS, utils.FSCgrId(uuid), uuid, @@ -656,8 +656,8 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e cc.Account, cc.Subject, cc.Destination, - cc.Cost, cc.ConnectFee, + cc.Cost, tss, source, runid)) @@ -668,12 +668,12 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e } func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) var accid, src string var timespansJson string cc = &CallCost{Cost: -1} err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Account, &cc.Subject, - &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson, &src) + &cc.Destination, &cc.ConnectFee, &cc.Cost, ×pansJson, &src) if err = json.Unmarshal([]byte(timespansJson), &cc.Timespans); err != nil { return nil, err } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e6c3cfc70..fcc10e50d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -101,7 +101,7 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session { // Disconnects a session by sending hangup command to freeswitch func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) { - engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid)) + // engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid)) err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify)) if err != nil { engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err)) @@ -123,6 +123,17 @@ func (sm *FSSessionManager) RemoveSession(s *Session) { } } +// Sets the call timeout valid of starting of the call +func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error { + err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid)) + if err != nil { + engine.Logger.Err("could not send sched_hangup command to freeswitch") + return err + } + return nil +} + + // Sends the transfer command to unpark the call to freeswitch func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) { err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify)) @@ -140,7 +151,7 @@ func (sm *FSSessionManager) OnHeartBeat(ev Event) { } func (sm *FSSessionManager) OnChannelPark(ev Event) { - engine.Logger.Info("freeswitch park") + //engine.Logger.Info("freeswitch park") startTime, err := ev.GetStartTime(PARK_TIME) if err != nil { engine.Logger.Err("Error parsing answer event start time, using time.Now!") @@ -162,8 +173,9 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { Subject: ev.GetSubject(), Account: ev.GetAccount(), Destination: ev.GetDestination(), - Amount: sm.debitPeriod.Seconds(), - TimeStart: startTime} + TimeStart: startTime, + TimeEnd: startTime.Add(cfg.SMMaxCallDuration), + } var remainingDurationFloat float64 err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat) if err != nil { @@ -172,17 +184,18 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { return } remainingDuration := time.Duration(remainingDurationFloat) - engine.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingDuration)) + //engine.Logger.Info(fmt.Sprintf("Remaining duration: %v", remainingDuration)) if remainingDuration == 0 { - engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) + //engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject))) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), INSUFFICIENT_FUNDS) return } + sm.setMaxCallDuration(ev.GetUUID(), remainingDuration) sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), AUTH_OK) } func (sm *FSSessionManager) OnChannelAnswer(ev Event) { - engine.Logger.Info(" FreeSWITCH answer.") + //engine.Logger.Info(" FreeSWITCH answer.") // Make sure cgr_type is enforced even if not set by FreeSWITCH if err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType())); err != nil { engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err)) @@ -194,7 +207,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) { } func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { - engine.Logger.Info(" FreeSWITCH hangup.") + //engine.Logger.Info(" FreeSWITCH hangup.") s := sm.GetSession(ev.GetUUID()) if s == nil { // Not handled by us return @@ -309,7 +322,7 @@ func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor, in } engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) if cc.GetDuration() == 0 || err != nil { - engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) + // engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) sm.DisconnectSession(s, INSUFFICIENT_FUNDS) return } From c6bbd322981ff62a35be86596303ccf07f0147c6 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jan 2014 18:54:04 +0100 Subject: [PATCH 03/10] Adding pseudoprepaid in session manager un-park --- sessionmanager/fssessionmanager.go | 10 +++++----- sessionmanager/session.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 677a0ec3e..e359f86fd 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -158,8 +158,8 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) { startTime = time.Now() } // if there is no account configured leave the call alone - if strings.TrimSpace(ev.GetReqType()) != utils.PREPAID { - return + if !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, strings.TrimSpace(ev.GetReqType())) { + return // we unpark only prepaid and pseudoprepaid calls } if ev.MissingParameter() { sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), MISSING_PARAMETER) @@ -286,7 +286,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } } // show only what was actualy refunded (stopped in timespan) - engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) + // engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration)) if len(refundIncrements) > 0 { cd := &engine.CallDescriptor{ Direction: lastCC.Direction, @@ -306,7 +306,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } cost := refundIncrements.GetTotalCost() lastCC.Cost -= cost - engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) + // engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost)) } @@ -318,7 +318,7 @@ func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (c // disconnect session s.sessionManager.DisconnectSession(s, SYSTEM_ERROR) } - engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) + // engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc)) if cc.GetDuration() == 0 || err != nil { // engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s)) sm.DisconnectSession(s, INSUFFICIENT_FUNDS) diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 40b57cc42..47951e132 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -112,7 +112,7 @@ func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) { // Stops the debit loop func (s *Session) Close(ev Event) { - engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) + // engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid)) if s == nil { return } @@ -146,6 +146,6 @@ func (s *Session) SaveOperations() { engine.Logger.Err(" Error: no connection to logger database, cannot save costs") } s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID, firstCC) - engine.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) + // engine.Logger.Debug(fmt.Sprintf(" End of call, having costs: %v", firstCC.String())) }() } From 30681fa7d40e60320a96436c0d7a1464889da2b2 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jan 2014 19:15:00 +0100 Subject: [PATCH 04/10] FS-CSV Tutorial updates --- docs/tut_freeswitch_csv.rst | 30 ++++++++++++++++++++++++++---- sessionmanager/fssessionmanager.go | 2 +- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/docs/tut_freeswitch_csv.rst b/docs/tut_freeswitch_csv.rst index 654e6e4a0..6721cdbf3 100644 --- a/docs/tut_freeswitch_csv.rst +++ b/docs/tut_freeswitch_csv.rst @@ -107,7 +107,7 @@ Test calls 1001 -> 1002 ~~~~~~~~~~~~ -Since the user 1001 is marked as prepaid inside FreeSWITCH_ directory configuration, calling between 1001 and 1002 should generate prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. As per our tariff plans, we should get first 60s charged as a whole, then in intervals of 1s (configured SessionManager debit interval of 10s). +Since the user 1001 is marked as *prepaid* inside FreeSWITCH_ directory configuration, calling between 1001 and 1002 should generate pre-auth and prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. As per our tariff plans, we should get first 60s charged as a whole, then in intervals of 1s (configured SessionManager debit interval of 10s). *Note*: An important particularity to note here is the ability of **CGRateS** SessionManager to refund units booked in advance (eg: if debit occurs every 10s and rate increments are set to 1s, the SessionManager will be smart enough to refund pre-booked credits for calls stoped in the middle of debit interval). @@ -121,7 +121,7 @@ Check that 1001 balance is properly debitted, during the call: 1002 -> 1001 ~~~~~~~~~~~~ -The user 1002 is marked as postpaid inside FreeSWITCH_ hence his calls will be debited at the end of the call instead of during a call and his balance will be able to go on negative without influencing his calls. +The user 1002 is marked as *postpaid* inside FreeSWITCH_ hence his calls will be debited at the end of the call instead of during a call and his balance will be able to go on negative without influencing his new calls (no pre-auth). To check that we had debits we use again console command, this time not during the call but at the end of it: @@ -130,21 +130,43 @@ To check that we had debits we use again console command, this time not during t cgr-console get_balance cgrates.org 1002 +1003 -> 1001 +~~~~~~~~~~~~ + +The user 1003 is marked as *pseudoprepaid* inside FreeSWITCH_ hence his calls will be considered same as prepaid (no call setups possible on negative balance due to pre-auth mechanism) but not handled automatically by session manager. His call costs will be calculated directly out of CDRs and balance updated by the time when mediation process occurs. This is sometimes a good compromise of prepaid running without influencing performance (there are no recurrent call debits during a call). + +To check that there are no debits during or by the end of the call, but when the CDR is imported, run the command before and after rotating the FreeSWITCH_ *.csv* CDRs: + +:: + + cgr-console get_balance cgrates.org 1003 + + +1004 -> 1001 +~~~~~~~~~~~~ + +The user 1004 is marked as *rated* inside FreeSWITCH_ hence his calls not interact in any way with accounting subsystem. The only action perfomed by **CGRateS** related to his calls wil be rating/mediation of his CDRs. + CDR processing -------------- -For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. In order to avoid double-processing them we will use the rotate mechanism built in FreeSWITCH_. We rotate files via *fs_console* command: +For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. +In order to avoid double-processing we will use the rotate mechanism built in FreeSWITCH_. We rotate files via *fs_console* command: :: fs_cli -x "cdr_csv rotate" -On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. Once in there mediation will occur, generating the costs inside *rated_cdrs* and *cost_details* tables. +On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. As soon as the CDR will hit CDRS component, mediation will occur, either considering the costs calculated in case of prepaid and postpaid calls out of *cost_details* table or query it's own one from rater in case of *pseudoprepaid* and *rated* CDRs. Once the CDRs are mediated, can be exported as *.csv* format again via remote command offered by *cgr-console* tool: +:: + + cgr-console export_cdrs csv + .. _FreeSWITCH: http://www.freeswitch.org/ .. _Jitsi: http://www.jitsi.org/ diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index e359f86fd..e80b39b8d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -259,7 +259,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { } end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd refundDuration := end.Sub(hangupTime) - initialRefundDuration := refundDuration + //initialRefundDuration := refundDuration var refundIncrements engine.Increments for i := len(lastCC.Timespans) - 1; i >= 0; i-- { ts := lastCC.Timespans[i] From 43c097aa98cc52bc6c92d35f0b512c1ec1402a8e Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jan 2014 19:22:18 +0100 Subject: [PATCH 05/10] Fixup config --- config/config_test.go | 4 ++-- config/test_data.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 5dd1d6f38..0a70f3934 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -123,7 +123,7 @@ func TestDefaults(t *testing.T) { eCfg.SMRater = "127.0.0.1:2012" eCfg.SMRaterReconnects = 3 eCfg.SMDebitInterval = 10 - eCfg.SMMaxCallDuration = time.Time(3) * time.Hour + eCfg.SMMaxCallDuration = time.Duration(3) * time.Hour eCfg.FreeswitchServer = "127.0.0.1:8021" eCfg.FreeswitchPass = "ClueCon" eCfg.FreeswitchReconnects = 5 @@ -246,7 +246,7 @@ func TestConfigFromFile(t *testing.T) { eCfg.SMRater = "test" eCfg.SMRaterReconnects = 99 eCfg.SMDebitInterval = 99 - eCfg.SMMaxCallDuration = "test" + eCfg.SMMaxCallDuration = time.Duration(99)*time.Second eCfg.FreeswitchServer = "test" eCfg.FreeswitchPass = "test" eCfg.FreeswitchReconnects = 99 diff --git a/config/test_data.txt b/config/test_data.txt index f3372761a..0b820a81c 100644 --- a/config/test_data.txt +++ b/config/test_data.txt @@ -96,7 +96,7 @@ switch_type = test # Defines the type of switch behind: . rater = test # Address where to reach the Rater. rater_reconnects = 99 # Number of reconnects to rater before giving up. debit_interval = 99 # Interval to perform debits on. -max_call_duration = test # Maximum call duration a prepaid call can last +max_call_duration = 99 # Maximum call duration a prepaid call can last [freeswitch] server = test # Adress where to connect to FreeSWITCH socket. From 91bae964bad67289caee553675cca52fc2cc5d2a Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 12 Jan 2014 19:56:27 +0100 Subject: [PATCH 06/10] Fixup CDRC to only log on errors but not stop the server, adding freeswitch rotate cdrs script --- cdrc/cdrc.go | 2 +- data/scripts/freeswitch_cdr_csv_rotate.sh | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) create mode 100755 data/scripts/freeswitch_cdr_csv_rotate.sh diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 1dee125a3..537f29691 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -152,7 +152,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { case ev := <-watcher.Event: if ev.IsCreate() && (self.cgrCfg.CdrcCdrType != FS_CSV || path.Ext(ev.Name) != ".csv") { if err = self.processFile(ev.Name); err != nil { - return err + engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) } } case err := <-watcher.Error: diff --git a/data/scripts/freeswitch_cdr_csv_rotate.sh b/data/scripts/freeswitch_cdr_csv_rotate.sh new file mode 100755 index 000000000..09eae3564 --- /dev/null +++ b/data/scripts/freeswitch_cdr_csv_rotate.sh @@ -0,0 +1,12 @@ +#! /usr/bin/env sh + +FS_CDR_CSV_DIR=/var/log/freeswitch/cdr-csv +CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/in/csv + +/usr/bin/fs_cli -x "cdr_csv rotate" + +find $FS_CDR_CSV_DIR -maxdepth 1 -mindepth 1 -not -name *.csv -print0 | xargs -0 mv -t $CGR_CDRC_IN_DIR + +exit 0 + + From 758f399acbe6094ea5ef2bf349d6502aef77bccc Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 13 Jan 2014 13:15:16 +0100 Subject: [PATCH 07/10] Adding static fields inside cdrc config --- cdrc/cdrc.go | 65 ++++++++++++++++++++++++++--------------- cdrc/cdrc_test.go | 28 +++++++++++++----- config/config.go | 4 +-- config/config_test.go | 2 +- data/conf/cgrates.cfg | 2 +- engine/storage_mysql.go | 3 ++ 6 files changed, 69 insertions(+), 35 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 537f29691..d6e539f97 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -51,10 +51,8 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { return nil, fmt.Errorf("Folder %s does not exist", dir) } } - if cdrc.cgrCfg.CdrcCdrType == CSV { - if err := cdrc.parseFieldIndexesFromConfig(); err != nil { - return nil, err - } + if err := cdrc.parseFieldsConfig(); err != nil { + return nil, err } cdrc.httpClient = new(http.Client) return cdrc, nil @@ -62,7 +60,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) { type Cdrc struct { cgrCfg *config.CGRConfig - fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file + cfgCdrFields map[string]string // Key is the name of the field httpClient *http.Client } @@ -79,19 +77,23 @@ func (self *Cdrc) Run() error { return nil } -// Parses fieldIndex strings into fieldIndex integers needed -func (self *Cdrc) parseFieldIndexesFromConfig() error { +// Loads all fields (primary and extra) into cfgCdrFields, do some pre-checks (eg: in case of csv make sure that values are integers) +func (self *Cdrc) parseFieldsConfig() error { var err error - self.fieldIndxes = make(map[string]int) - fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION} - fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField, - self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField} - for i, strVal := range fieldIdxStrs { - if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil { - return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i]) - } - } - // Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"} + self.cfgCdrFields = map[string]string{ + utils.ACCID: self.cgrCfg.CdrcAccIdField, + utils.REQTYPE: self.cgrCfg.CdrcReqTypeField, + utils.DIRECTION: self.cgrCfg.CdrcDirectionField, + utils.TENANT: self.cgrCfg.CdrcTenantField, + utils.TOR: self.cgrCfg.CdrcTorField, + utils.ACCOUNT: self.cgrCfg.CdrcAccountField, + utils.SUBJECT: self.cgrCfg.CdrcSubjectField, + utils.DESTINATION: self.cgrCfg.CdrcDestinationField, + utils.ANSWER_TIME: self.cgrCfg.CdrcAnswerTimeField, + utils.DURATION: self.cgrCfg.CdrcDurationField, + } + + // Add extra fields here, config extra fields in the form of []string{"fieldName1:indxInCsv1","fieldName2: indexInCsv2"} for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields { splt := strings.Split(fieldWithIdx, ":") if len(splt) != 2 { @@ -100,8 +102,14 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error { if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) { return errors.New("Extra cdrc.extra_fields overwriting primary fields") } - if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil { - return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1]) + self.cfgCdrFields[splt[0]] = splt[1] + } + // Fields populated, do some sanity checks here + for cdrField, cfgVal := range self.cfgCdrFields { + if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) && !strings.HasPrefix(cfgVal, utils.STATIC_VALUE_PREFIX) { + if _, err = strconv.Atoi(cfgVal); err != nil { + return fmt.Errorf("Cannot parse configuration field %s into integer", cdrField) + } } } return nil @@ -112,11 +120,22 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) { // engine.Logger.Info(fmt.Sprintf("Processing record %v", record)) v := url.Values{} v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId) - for fldName, idx := range self.fieldIndxes { - if len(record) <= idx { - return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName) + for cfgFieldName, cfgFieldVal := range self.cfgCdrFields { + var fieldVal string + if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) { + fieldVal = cfgFieldVal[1:] + } else if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) { + if cfgFieldIdx, err := strconv.Atoi(cfgFieldVal); err != nil { // Should in theory never happen since we have already parsed config + return nil, err + } else if len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cfgFieldName) + } else { + fieldVal = record[cfgFieldIdx] + } + } else { // Modify here when we add more supported cdr formats + fieldVal = "UNKNOWN" } - v.Set(fldName, record[idx]) + v.Set(cfgFieldName, fieldVal) } return v, nil } diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index 3eee88c06..87f72cf4e 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -24,28 +24,40 @@ import ( "testing" ) -func TestParseFieldIndexesFromConfig(t *testing.T) { +func TestParseFieldsConfig(t *testing.T) { // Test default config cgrConfig, _ := config.NewDefaultCGRConfig() // Test primary field index definition cgrConfig.CdrcAccIdField = "detect_me" cdrc := &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err == nil { + if err := cdrc.parseFieldsConfig(); err == nil { t.Error("Failed detecting error in accounting id definition", err) } + cgrConfig.CdrcAccIdField = "^static_val" + cgrConfig.CdrcSubjectField = "1" + cdrc = &Cdrc{cgrCfg: cgrConfig} + if err := cdrc.parseFieldsConfig(); err != nil { + t.Error("Failed to corectly parse primary fields %v", cdrc.cfgCdrFields) + } + cgrConfig.CdrcExtraFields = []string{"^static_val:orig_ip"} // Test extra field index definition cgrConfig.CdrcAccIdField = "0" // Put back as int - cgrConfig.CdrcExtraFields = []string{"supplier1", "11:orig_ip"} + cgrConfig.CdrcExtraFields = []string{"supplier1", "orig_ip:11"} cdrc = &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err == nil { + if err := cdrc.parseFieldsConfig(); err == nil { t.Error("Failed detecting error in extra fields definition", err) } + cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"} + cdrc = &Cdrc{cgrCfg: cgrConfig} + if err := cdrc.parseFieldsConfig(); err != nil { + t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields) + } } func TestCdrAsHttpForm(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() cdrc := &Cdrc{cgrCfg: cgrConfig} - if err := cdrc.parseFieldIndexesFromConfig(); err != nil { + if err := cdrc.parseFieldsConfig(); err != nil { t.Error("Failed parsing default fieldIndexesFromConfig", err) } cdrRow := []string{"firstField", "secondField"} @@ -64,7 +76,7 @@ func TestCdrAsHttpForm(t *testing.T) { if cdrAsForm.Get(utils.REQTYPE) != "prepaid" { t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) } - if cdrAsForm.Get("supplier") != "supplier1" { - t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE)) - } + //if cdrAsForm.Get("supplier") != "supplier1" { + // t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier")) + //} } diff --git a/config/config.go b/config/config.go index 9920290e4..2db5edf22 100644 --- a/config/config.go +++ b/config/config.go @@ -110,7 +110,7 @@ type CGRConfig struct { CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs. CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs. CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs. - CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2" + CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "field1:index1,field2:index2" SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer @@ -200,7 +200,7 @@ func (self *CGRConfig) setDefaults() error { self.CdrcDestinationField = "7" self.CdrcAnswerTimeField = "8" self.CdrcDurationField = "9" - self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} + self.CdrcExtraFields = []string{} self.MediatorEnabled = false self.MediatorListen = "127.0.0.1:2032" self.MediatorRater = "127.0.0.1:2012" diff --git a/config/config_test.go b/config/config_test.go index 0a70f3934..0e43924d8 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -103,7 +103,7 @@ func TestDefaults(t *testing.T) { eCfg.CdrcDestinationField = "7" eCfg.CdrcAnswerTimeField = "8" eCfg.CdrcDurationField = "9" - eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"} + eCfg.CdrcExtraFields = []string{} eCfg.MediatorEnabled = false eCfg.MediatorListen = "127.0.0.1:2032" eCfg.MediatorRater = "127.0.0.1:2012" diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index 1af1f445f..fa3900a67 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -74,7 +74,7 @@ # destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. # answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. # duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: :[,:] +# extra_fields = # Extra fields identifiers. For .csv, format: :[...,:] [mediator] # enabled = false # Starts Mediator service: . diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 1df69b393..543243883 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -33,5 +33,8 @@ func NewMySQLStorage(host, port, name, user, password string) (Storage, error) { if err != nil { return nil, err } + if err := db.Ping(); err != nil { + return nil, err + } return &MySQLStorage{&SQLStorage{db}}, nil } From 993c5400aabf1b3073b370bc39b129ce8cf77832 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 13 Jan 2014 13:47:25 +0100 Subject: [PATCH 08/10] Static fields in CDRC tutorial example, rotate csv files by script provided --- .../fs_csv/cgrates/etc/cgrates/cgrates.cfg | 22 +++++++++---------- docs/tut_freeswitch_csv.rst | 7 ++++-- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg index a5ccac8c6..215fd6513 100644 --- a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg @@ -64,17 +64,17 @@ cdr_type = freeswitch_csv # CDR file format . cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are stored. cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved. cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. -# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. -# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. -# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs. -# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs. -# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs. -# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs. -# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs. -# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs. -# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs. -# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs. -# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: :[,:] +accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs. +reqtype_field = 15 # Request type field identifier. Use index number in case of .csv cdrs. +direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs. +tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs. +tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs. +account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs. +subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs. +destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs. +answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs. +duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs. +extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: : [mediator] enabled = true # Starts Mediator service: . diff --git a/docs/tut_freeswitch_csv.rst b/docs/tut_freeswitch_csv.rst index 6721cdbf3..80de413e2 100644 --- a/docs/tut_freeswitch_csv.rst +++ b/docs/tut_freeswitch_csv.rst @@ -152,11 +152,14 @@ CDR processing -------------- For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. -In order to avoid double-processing we will use the rotate mechanism built in FreeSWITCH_. We rotate files via *fs_console* command: +In order to avoid double-processing we will use the rotate mechanism built in FreeSWITCH_. +Once rotated, we will move the resulted files inside the path considered by **CGRateS** *CDRC* component as inbound. + +These steps are automated in a script provided in the */usr/share/cgrates/scripts* location: :: - fs_cli -x "cdr_csv rotate" + /usr/share/cgrates/scripts/freeswitch_cdr_csv_rotate.sh On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. As soon as the CDR will hit CDRS component, mediation will occur, either considering the costs calculated in case of prepaid and postpaid calls out of *cost_details* table or query it's own one from rater in case of *pseudoprepaid* and *rated* CDRs. From 9428ffa9c654d094cec0ef4917d409a2be45fe39 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 13 Jan 2014 20:24:36 +0100 Subject: [PATCH 09/10] Fixups for loader to work with dry_run when destinations file not provided, config fixups, doc fixups --- cdrc/cdrc.go | 2 -- data/conf/cgrates.cfg | 10 +++++----- data/scripts/freeswitch_cdr_csv_rotate.sh | 2 +- docs/installation.rst | 8 ++++---- engine/loader_csv.go | 19 +++++++++++-------- engine/storage_sql.go | 2 +- pkg/debian/postinst | 3 ++- pkg/debian/rules | 1 + utils/consts.go | 2 +- 9 files changed, 26 insertions(+), 23 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index d6e539f97..7393b95e8 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -74,7 +74,6 @@ func (self *Cdrc) Run() error { self.processCdrDir() time.Sleep(self.cgrCfg.CdrcRunDelay) } - return nil } // Loads all fields (primary and extra) into cfgCdrFields, do some pre-checks (eg: in case of csv make sure that values are integers) @@ -178,7 +177,6 @@ func (self *Cdrc) trackCDRFiles() (err error) { engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error())) } } - return } // Processe file at filePath and posts the valid cdr rows out of it diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg index fa3900a67..1ba1e1a2c 100644 --- a/data/conf/cgrates.cfg +++ b/data/conf/cgrates.cfg @@ -51,9 +51,9 @@ # mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal> [cdre] -# cdr_format = csv # Exported CDRs format -# extra_fields = # List of extra fields to be exported out in CDRs -# export_dir = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed +# cdr_format = csv # Exported CDRs format +# extra_fields = # List of extra fields to be exported out in CDRs +# export_dir = /var/log/cgrates/cdr/cdrexport/csv # Path where the exported CDRs will be placed [cdrc] # enabled = false # Enable CDR client functionality @@ -61,8 +61,8 @@ # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify # cdr_type = csv # CDR file format . -# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored. -# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved. +# cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. +# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved. # cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. # accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs. # reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs. diff --git a/data/scripts/freeswitch_cdr_csv_rotate.sh b/data/scripts/freeswitch_cdr_csv_rotate.sh index 09eae3564..90d096f7c 100755 --- a/data/scripts/freeswitch_cdr_csv_rotate.sh +++ b/data/scripts/freeswitch_cdr_csv_rotate.sh @@ -1,7 +1,7 @@ #! /usr/bin/env sh FS_CDR_CSV_DIR=/var/log/freeswitch/cdr-csv -CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/in/csv +CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/cdrc/in /usr/bin/fs_cli -x "cdr_csv rotate" diff --git a/docs/installation.rst b/docs/installation.rst index ab383ab6d..76f9bfa94 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -51,13 +51,13 @@ As DataDB types (rating and accounting subsystems): - Redis_ -As StorDB (persistent storage for CDRs and tariff plan versions): +As StorDB (persistent storage for CDRs and tariff plan versions). + +Once installed there should be no special requirements in terms of setup since no schema is necessary. - MySQL_ -Redis_: once installed there should be no special requirements in terms of setup since no schema is necessary. - -MySQL_: once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package) +Once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package) :: diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 1752c724a..e839111ef 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -334,13 +334,15 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { break } } - if !destinationExists { - if dbExists, err := csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil { + var err error + if !destinationExists && csvr.dataStorage != nil { + if destinationExists, err = csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil { return err - } else if !dbExists { - return fmt.Errorf("Could not get destination for tag %v", record[1]) } } + if !destinationExists { + return fmt.Errorf("Could not get destination for tag %v", record[1]) + } dr := &utils.TPDestinationRate{ DestinationRateId: tag, DestinationRates: []*utils.DestinationRate{ @@ -417,13 +419,14 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { csvr.ratingProfiles[key] = rp } _, exists := csvr.ratingPlans[record[5]] - if !exists { - if dbExists, err := csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil { + if !exists && csvr.dataStorage != nil { + if exists, err = csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil { return err - } else if !dbExists { - return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) } } + if !exists { + return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) + } rpa := &RatingPlanActivation{ ActivationTime: at, RatingPlanId: record[5], diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 0f4e34b78..fa57d834a 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -668,7 +668,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e } func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) { - row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) + row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid)) var accid, src string var timespansJson string cc = &CallCost{Cost: -1} diff --git a/pkg/debian/postinst b/pkg/debian/postinst index af3e428a3..fe1460296 100755 --- a/pkg/debian/postinst +++ b/pkg/debian/postinst @@ -27,9 +27,10 @@ set -e case "$1" in configure) - adduser --quiet --system --group --disabled-password --shell /bin/false --home /var/run/cgrates --gecos "CGRateS" cgrates || true + adduser --quiet --system --group --disabled-password --shell /bin/false --gecos "CGRateS" cgrates || true chown -R cgrates:cgrates /var/log/cgrates/ chown -R cgrates:cgrates /usr/share/cgrates/ + chown -R cgrates:cgrates /var/run/cgrates/ ;; abort-upgrade|abort-remove|abort-deconfigure) diff --git a/pkg/debian/rules b/pkg/debian/rules index 889fbd6cb..047c58cc1 100755 --- a/pkg/debian/rules +++ b/pkg/debian/rules @@ -33,6 +33,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrc/out mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrexport/csv mkdir -p $(PKGDIR)/var/log/cgrates/history + mkdir -p $(PKGDIR)/var/run/cgrates dh_strip dh_compress dh_fixperms diff --git a/utils/consts.go b/utils/consts.go index 5f17455f2..070638c1f 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1,7 +1,7 @@ package utils const ( - VERSION = "0.9.1rc3" + VERSION = "0.9.1c3" POSTGRES = "postgres" MYSQL = "mysql" MONGO = "mongo" From 9feadce3627bfb92cb02a469c0b669ee594fce6f Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 13 Jan 2014 20:57:18 +0100 Subject: [PATCH 10/10] Fixups freeswitch rotate script, tutorial config --- data/scripts/freeswitch_cdr_csv_rotate.sh | 3 +-- data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/data/scripts/freeswitch_cdr_csv_rotate.sh b/data/scripts/freeswitch_cdr_csv_rotate.sh index 90d096f7c..09bcbd3f5 100755 --- a/data/scripts/freeswitch_cdr_csv_rotate.sh +++ b/data/scripts/freeswitch_cdr_csv_rotate.sh @@ -5,8 +5,7 @@ CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/cdrc/in /usr/bin/fs_cli -x "cdr_csv rotate" -find $FS_CDR_CSV_DIR -maxdepth 1 -mindepth 1 -not -name *.csv -print0 | xargs -0 mv -t $CGR_CDRC_IN_DIR +find $FS_CDR_CSV_DIR -maxdepth 1 -mindepth 1 -not -name *.csv -exec chown cgrates:cgrates '{}' \; -exec mv '{}' $CGR_CDRC_IN_DIR \; exit 0 - diff --git a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg index 215fd6513..5ee293764 100644 --- a/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg +++ b/data/tutorials/fs_csv/cgrates/etc/cgrates/cgrates.cfg @@ -61,11 +61,11 @@ enabled = true # Enable CDR client functionality # cdrs_method = http_cgr # Mechanism to use when posting CDRs on server # run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify cdr_type = freeswitch_csv # CDR file format . -cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are stored. +cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored. cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved. cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database. accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs. -reqtype_field = 15 # Request type field identifier. Use index number in case of .csv cdrs. +reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs. direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs. tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs. tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs.