Merge branch 'master' into shared_balances

Conflicts:
	engine/loader_csv.go
This commit is contained in:
Radu Ioan Fericean
2014-01-13 22:34:19 +02:00
22 changed files with 241 additions and 113 deletions

View File

@@ -51,10 +51,8 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
return nil, fmt.Errorf("Folder %s does not exist", dir)
}
}
if cdrc.cgrCfg.CdrcCdrType == CSV {
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
return nil, err
}
if err := cdrc.parseFieldsConfig(); err != nil {
return nil, err
}
cdrc.httpClient = new(http.Client)
return cdrc, nil
@@ -62,7 +60,7 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
type Cdrc struct {
cgrCfg *config.CGRConfig
fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
cfgCdrFields map[string]string // Key is the name of the field
httpClient *http.Client
}
@@ -76,22 +74,25 @@ func (self *Cdrc) Run() error {
self.processCdrDir()
time.Sleep(self.cgrCfg.CdrcRunDelay)
}
return nil
}
// Parses fieldIndex strings into fieldIndex integers needed
func (self *Cdrc) parseFieldIndexesFromConfig() error {
// Loads all fields (primary and extra) into cfgCdrFields, do some pre-checks (eg: in case of csv make sure that values are integers)
func (self *Cdrc) parseFieldsConfig() error {
var err error
self.fieldIndxes = make(map[string]int)
fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION}
fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField,
self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField}
for i, strVal := range fieldIdxStrs {
if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil {
return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i])
}
}
// Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"}
self.cfgCdrFields = map[string]string{
utils.ACCID: self.cgrCfg.CdrcAccIdField,
utils.REQTYPE: self.cgrCfg.CdrcReqTypeField,
utils.DIRECTION: self.cgrCfg.CdrcDirectionField,
utils.TENANT: self.cgrCfg.CdrcTenantField,
utils.TOR: self.cgrCfg.CdrcTorField,
utils.ACCOUNT: self.cgrCfg.CdrcAccountField,
utils.SUBJECT: self.cgrCfg.CdrcSubjectField,
utils.DESTINATION: self.cgrCfg.CdrcDestinationField,
utils.ANSWER_TIME: self.cgrCfg.CdrcAnswerTimeField,
utils.DURATION: self.cgrCfg.CdrcDurationField,
}
// Add extra fields here, config extra fields in the form of []string{"fieldName1:indxInCsv1","fieldName2: indexInCsv2"}
for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields {
splt := strings.Split(fieldWithIdx, ":")
if len(splt) != 2 {
@@ -100,8 +101,14 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error {
if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) {
return errors.New("Extra cdrc.extra_fields overwriting primary fields")
}
if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil {
return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1])
self.cfgCdrFields[splt[0]] = splt[1]
}
// Fields populated, do some sanity checks here
for cdrField, cfgVal := range self.cfgCdrFields {
if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) && !strings.HasPrefix(cfgVal, utils.STATIC_VALUE_PREFIX) {
if _, err = strconv.Atoi(cfgVal); err != nil {
return fmt.Errorf("Cannot parse configuration field %s into integer", cdrField)
}
}
}
return nil
@@ -112,11 +119,22 @@ func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
// engine.Logger.Info(fmt.Sprintf("Processing record %v", record))
v := url.Values{}
v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId)
for fldName, idx := range self.fieldIndxes {
if len(record) <= idx {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName)
for cfgFieldName, cfgFieldVal := range self.cfgCdrFields {
var fieldVal string
if strings.HasPrefix(cfgFieldVal, utils.STATIC_VALUE_PREFIX) {
fieldVal = cfgFieldVal[1:]
} else if utils.IsSliceMember([]string{CSV, FS_CSV}, self.cgrCfg.CdrcCdrType) {
if cfgFieldIdx, err := strconv.Atoi(cfgFieldVal); err != nil { // Should in theory never happen since we have already parsed config
return nil, err
} else if len(record) <= cfgFieldIdx {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cfgFieldName)
} else {
fieldVal = record[cfgFieldIdx]
}
} else { // Modify here when we add more supported cdr formats
fieldVal = "UNKNOWN"
}
v.Set(fldName, record[idx])
v.Set(cfgFieldName, fieldVal)
}
return v, nil
}
@@ -152,14 +170,13 @@ func (self *Cdrc) trackCDRFiles() (err error) {
case ev := <-watcher.Event:
if ev.IsCreate() && (self.cgrCfg.CdrcCdrType != FS_CSV || path.Ext(ev.Name) != ".csv") {
if err = self.processFile(ev.Name); err != nil {
return err
engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error()))
}
}
case err := <-watcher.Error:
engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
}
}
return
}
// Processe file at filePath and posts the valid cdr rows out of it

View File

@@ -24,28 +24,40 @@ import (
"testing"
)
func TestParseFieldIndexesFromConfig(t *testing.T) {
func TestParseFieldsConfig(t *testing.T) {
// Test default config
cgrConfig, _ := config.NewDefaultCGRConfig()
// Test primary field index definition
cgrConfig.CdrcAccIdField = "detect_me"
cdrc := &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err == nil {
if err := cdrc.parseFieldsConfig(); err == nil {
t.Error("Failed detecting error in accounting id definition", err)
}
cgrConfig.CdrcAccIdField = "^static_val"
cgrConfig.CdrcSubjectField = "1"
cdrc = &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldsConfig(); err != nil {
t.Error("Failed to corectly parse primary fields %v", cdrc.cfgCdrFields)
}
cgrConfig.CdrcExtraFields = []string{"^static_val:orig_ip"}
// Test extra field index definition
cgrConfig.CdrcAccIdField = "0" // Put back as int
cgrConfig.CdrcExtraFields = []string{"supplier1", "11:orig_ip"}
cgrConfig.CdrcExtraFields = []string{"supplier1", "orig_ip:11"}
cdrc = &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err == nil {
if err := cdrc.parseFieldsConfig(); err == nil {
t.Error("Failed detecting error in extra fields definition", err)
}
cgrConfig.CdrcExtraFields = []string{"supplier1:^top_supplier", "orig_ip:11"}
cdrc = &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldsConfig(); err != nil {
t.Errorf("Failed to corectly parse extra fields %v",cdrc.cfgCdrFields)
}
}
func TestCdrAsHttpForm(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrc := &Cdrc{cgrCfg: cgrConfig}
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
if err := cdrc.parseFieldsConfig(); err != nil {
t.Error("Failed parsing default fieldIndexesFromConfig", err)
}
cdrRow := []string{"firstField", "secondField"}
@@ -64,7 +76,7 @@ func TestCdrAsHttpForm(t *testing.T) {
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}
if cdrAsForm.Get("supplier") != "supplier1" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}
//if cdrAsForm.Get("supplier") != "supplier1" {
// t.Error("Unexpected CDR value received", cdrAsForm.Get("supplier"))
//}
}

View File

@@ -110,12 +110,13 @@ type CGRConfig struct {
CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs.
CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs.
CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs.
CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "field1:index1,field2:index2"
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMRaterReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMMaxCallDuration time.Duration // The maximum duration of a call
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorListen string // Mediator's listening interface: <internal>.
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
@@ -199,7 +200,7 @@ func (self *CGRConfig) setDefaults() error {
self.CdrcDestinationField = "7"
self.CdrcAnswerTimeField = "8"
self.CdrcDurationField = "9"
self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
self.CdrcExtraFields = []string{}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
@@ -219,6 +220,7 @@ func (self *CGRConfig) setDefaults() error {
self.SMRater = "127.0.0.1:2012"
self.SMRaterReconnects = 3
self.SMDebitInterval = 10
self.SMMaxCallDuration = time.Duration(3) * time.Hour
self.FreeswitchServer = "127.0.0.1:8021"
self.FreeswitchPass = "ClueCon"
self.FreeswitchReconnects = 5
@@ -519,6 +521,12 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("session_manager", "debit_interval"); hasOpt {
cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval")
}
if hasOpt = c.HasOption("session_manager", "max_call_duration"); hasOpt {
maxCallDurStr,_ := c.GetString("session_manager", "max_call_duration")
if cfg.SMMaxCallDuration, errParse = utils.ParseDurationWithSecs(maxCallDurStr); errParse != nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("freeswitch", "server"); hasOpt {
cfg.FreeswitchServer, _ = c.GetString("freeswitch", "server")
}

View File

@@ -103,7 +103,7 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcDestinationField = "7"
eCfg.CdrcAnswerTimeField = "8"
eCfg.CdrcDurationField = "9"
eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
eCfg.CdrcExtraFields = []string{}
eCfg.MediatorEnabled = false
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
@@ -123,6 +123,7 @@ func TestDefaults(t *testing.T) {
eCfg.SMRater = "127.0.0.1:2012"
eCfg.SMRaterReconnects = 3
eCfg.SMDebitInterval = 10
eCfg.SMMaxCallDuration = time.Duration(3) * time.Hour
eCfg.FreeswitchServer = "127.0.0.1:8021"
eCfg.FreeswitchPass = "ClueCon"
eCfg.FreeswitchReconnects = 5
@@ -245,6 +246,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.SMRater = "test"
eCfg.SMRaterReconnects = 99
eCfg.SMDebitInterval = 99
eCfg.SMMaxCallDuration = time.Duration(99)*time.Second
eCfg.FreeswitchServer = "test"
eCfg.FreeswitchPass = "test"
eCfg.FreeswitchReconnects = 99

View File

@@ -94,8 +94,9 @@ duration_fields = test # Name of duration fields to be used during mediation.
enabled = true # Starts SessionManager service: <true|false>.
switch_type = test # Defines the type of switch behind: <freeswitch>.
rater = test # Address where to reach the Rater.
rater_reconnects = 99 # Number of reconnects to rater before giving up.
debit_interval = 99 # Interval to perform debits on.
rater_reconnects = 99 # Number of reconnects to rater before giving up.
debit_interval = 99 # Interval to perform debits on.
max_call_duration = 99 # Maximum call duration a prepaid call can last
[freeswitch]
server = test # Adress where to connect to FreeSWITCH socket.

View File

@@ -51,9 +51,9 @@
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
[cdre]
# cdr_format = csv # Exported CDRs format <csv>
# extra_fields = # List of extra fields to be exported out in CDRs
# export_dir = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed
# cdr_format = csv # Exported CDRs format <csv>
# extra_fields = # List of extra fields to be exported out in CDRs
# export_dir = /var/log/cgrates/cdr/cdrexport/csv # Path where the exported CDRs will be placed
[cdrc]
# enabled = false # Enable CDR client functionality
@@ -61,8 +61,8 @@
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
# cdr_type = csv # CDR file format <csv|freeswitch_csv>.
# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored.
# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved.
# cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored.
# cdr_out_dir = /var/log/cgrates/cdr/cdrc/out # Absolute path towards the directory where processed CDRs will be moved.
# cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database.
# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs.
# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs.
@@ -74,7 +74,7 @@
# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: <index_extrafield_1>:<label_extrafield_1>[,<index_extrafield_n>:<label_extrafield_n>]
# extra_fields = # Extra fields identifiers. For .csv, format: <label_extrafield_1>:<index_extrafield_1>[...,<label_extrafield_n>:<index_extrafield_n>]
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
@@ -97,7 +97,8 @@
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
# debit_interval = 10 # Interval to perform debits on.
# max_call_duration = 3h # Maximum call duration a prepaid call can last
[freeswitch]
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.

View File

@@ -0,0 +1,11 @@
#! /usr/bin/env sh
FS_CDR_CSV_DIR=/var/log/freeswitch/cdr-csv
CGR_CDRC_IN_DIR=/var/log/cgrates/cdr/cdrc/in
/usr/bin/fs_cli -x "cdr_csv rotate"
find $FS_CDR_CSV_DIR -maxdepth 1 -mindepth 1 -not -name *.csv -exec chown cgrates:cgrates '{}' \; -exec mv '{}' $CGR_CDRC_IN_DIR \;
exit 0

View File

@@ -14,8 +14,8 @@ CREATE TABLE `cost_details` (
`account` varchar(128) NOT NULL,
`subject` varchar(128) NOT NULL,
`destination` varchar(128) NOT NULL,
`cost` DECIMAL(20,4) NOT NULL,
`connect_fee` DECIMAL(5,4) NOT NULL,
`cost` DECIMAL(20,4) NOT NULL,
`timespans` text,
`source` varchar(64) NOT NULL,
`runid` varchar(64) NOT NULL,

View File

@@ -61,20 +61,20 @@ enabled = true # Enable CDR client functionality
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
cdr_type = freeswitch_csv # CDR file format <csv>.
cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are stored.
cdr_in_dir = /var/log/cgrates/cdr/cdrc/in # Absolute path towards the directory where the CDRs are stored.
cdr_out_dir = /tmp # Absolute path towards the directory where processed CDRs will be moved.
cdr_source_id = freeswitch_csv # Free form field, tag identifying the source of the CDRs within CGRS database.
# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs.
# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs.
# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs.
# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs.
# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs.
# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs.
# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs.
# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
# extra_fields = 10:supplier,11:orig_ip # Extra fields identifiers. For .csv, format: <index_extrafield_1>:<label_extrafield_1>[,<index_extrafield_n>:<label_extrafield_n>]
accid_field = 10 # Accounting id field identifier. Use index number in case of .csv cdrs.
reqtype_field = 16 # Request type field identifier. Use index number in case of .csv cdrs.
direction_field = ^*out # Direction field identifier. Use index numbers in case of .csv cdrs.
tenant_field = ^cgrates.org # Tenant field identifier. Use index numbers in case of .csv cdrs.
tor_field = ^call # Type of Record field identifier. Use index numbers in case of .csv cdrs.
account_field = 1 # Account field identifier. Use index numbers in case of .csv cdrs.
subject_field = 1 # Subject field identifier. Use index numbers in case of .csv CDRs.
destination_field = 2 # Destination field identifier. Use index numbers in case of .csv cdrs.
answer_time_field = 5 # Answer time field identifier. Use index numbers in case of .csv cdrs.
duration_field = 8 # Duration field identifier. Use index numbers in case of .csv cdrs.
extra_fields = read_codec:13,write_codec:14 # Extra fields identifiers. For .csv, format: <label_extrafield1>:<index_extrafield_1>
[mediator]
enabled = true # Starts Mediator service: <true|false>.

View File

@@ -254,7 +254,7 @@
-->
<!-- Place all prepaid requests into park app. Notify must be empty so we do not loop. -->
<extension name="CGRateS_AuthPrepaid">
<condition field="${cgr_reqtype}" expression="^prepaid$" />
<condition field="${cgr_reqtype}" expression="^prepaid$|^pseudoprepaid$" />
<condition field="${cgr_notify}" expression="^$">
<action application="park"/>
</condition>
@@ -262,7 +262,7 @@
<!-- In case of CGRateS returning INSUFFICIENT_FUNDS, play the message (if recorded) and disconnect the call with 403. -->
<extension name="CGRateS_AuthForbidden">
<condition field="${cgr_reqtype}" expression="prepaid" />
<condition field="${cgr_reqtype}" expression="^prepaid$|^pseudoprepaid$" />
<condition field="${cgr_notify}" expression="^-INSUFFICIENT_FUNDS$">
<action application="playback" data="tone_stream://path=$${sounds_dir}/insufficient_funds.wav"/>
<action application="set" data="proto_specific_hangup_cause=sip:403"/>
@@ -272,7 +272,7 @@
<!-- In case of CGRateS returning SYSTEM_ERROR, disconnect the call so we do not risk prepaid calls going out. -->
<extension name="CGRateS_Error">
<condition field="${cgr_reqtype}" expression="^prepaid$" />
<condition field="${cgr_reqtype}" expression="^prepaid$|^pseudoprepaid$" />
<condition field="${cgr_notify}" expression="^-SYSTEM_ERROR$">
<action application="set" data="proto_specific_hangup_cause=sip:503"/>
<action application="hangup"/>

View File

@@ -51,13 +51,13 @@ As DataDB types (rating and accounting subsystems):
- Redis_
As StorDB (persistent storage for CDRs and tariff plan versions):
As StorDB (persistent storage for CDRs and tariff plan versions).
Once installed there should be no special requirements in terms of setup since no schema is necessary.
- MySQL_
Redis_: once installed there should be no special requirements in terms of setup since no schema is necessary.
MySQL_: once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package)
Once database is installed, CGRateS database needs to be set-up out of provided scripts (example for the paths set-up by debian package)
::

View File

@@ -13,10 +13,10 @@ Scenario
- **CGRateS** with following components:
- CGR-SM started as prepaid controller, with debits taking place at 5s intervals.
- CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR (moving the processed *.csv* files to */tmp* folder).
- CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC.
- CGR-CDRE exporting mediated CDRs (export path: */tmp*).
- CGR-History component keeping the archive of the rates modifications (path: */tmp/cgr_history*).
- CGR-CDRC component importing FreeSWITCH_ generated *.csv* CDRs into CGR and moving the processed *.csv* files to */tmp* folder.
- CGR-Mediator compoenent attaching costs to the raw CDRs from CGR-CDRC inside CGR StorDB.
- CGR-CDRE exporting mediated CDRs from CGR StorDB (export path: */tmp*).
- CGR-History component keeping the archive of the rates modifications (path browsable with git client at */tmp/cgr_history*).
Starting FreeSWITCH_ with custom configuration
@@ -103,28 +103,73 @@ To verify that all actions successfully performed, we use following *cgr-console
Test calls
----------
Calling between 1001 and 1003 should generate prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. The difference between calling from 1001 or 1003 should be reflected in fact that 1001 will generate real-time debits as opposite to 1003 which will only generate debits when CDRs will be processed.
1001 -> 1002
~~~~~~~~~~~~
Since the user 1001 is marked as *prepaid* inside FreeSWITCH_ directory configuration, calling between 1001 and 1002 should generate pre-auth and prepaid debits which can be checked with *get_balance* command integrated within *cgr-console* tool. As per our tariff plans, we should get first 60s charged as a whole, then in intervals of 1s (configured SessionManager debit interval of 10s).
*Note*: An important particularity to note here is the ability of **CGRateS** SessionManager to refund units booked in advance (eg: if debit occurs every 10s and rate increments are set to 1s, the SessionManager will be smart enough to refund pre-booked credits for calls stoped in the middle of debit interval).
Check that 1001 balance is properly debitted, during the call:
::
cgr-console get_balance cgrates.org 1001
1002 -> 1001
~~~~~~~~~~~~
The user 1002 is marked as *postpaid* inside FreeSWITCH_ hence his calls will be debited at the end of the call instead of during a call and his balance will be able to go on negative without influencing his new calls (no pre-auth).
To check that we had debits we use again console command, this time not during the call but at the end of it:
::
cgr-console get_balance cgrates.org 1002
1003 -> 1001
~~~~~~~~~~~~
The user 1003 is marked as *pseudoprepaid* inside FreeSWITCH_ hence his calls will be considered same as prepaid (no call setups possible on negative balance due to pre-auth mechanism) but not handled automatically by session manager. His call costs will be calculated directly out of CDRs and balance updated by the time when mediation process occurs. This is sometimes a good compromise of prepaid running without influencing performance (there are no recurrent call debits during a call).
To check that there are no debits during or by the end of the call, but when the CDR is imported, run the command before and after rotating the FreeSWITCH_ *.csv* CDRs:
::
cgr-console get_balance cgrates.org 1003
1004 -> 1001
~~~~~~~~~~~~
The user 1004 is marked as *rated* inside FreeSWITCH_ hence his calls not interact in any way with accounting subsystem. The only action perfomed by **CGRateS** related to his calls wil be rating/mediation of his CDRs.
CDR processing
--------------
For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file. In order to avoid double-processing them we will use the rotate mechanism built in FreeSWITCH_. We rotate files via *fs_console* command:
For every call FreeSWITCH_ will generate CDR records within the *Master.csv* file.
In order to avoid double-processing we will use the rotate mechanism built in FreeSWITCH_.
Once rotated, we will move the resulted files inside the path considered by **CGRateS** *CDRC* component as inbound.
These steps are automated in a script provided in the */usr/share/cgrates/scripts* location:
::
fs_cli -x "cdr_csv rotate"
/usr/share/cgrates/scripts/freeswitch_cdr_csv_rotate.sh
On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. Once in there mediation will occur, generating the costs inside *rated_cdrs* and *cost_details* tables.
On each rotate CGR-CDRC component will be informed via *inotify* subsystem and will instantly process the CDR file. The records end up in **CGRateS**/StorDB inside *cdrs_primary* table via CGR-CDRS. As soon as the CDR will hit CDRS component, mediation will occur, either considering the costs calculated in case of prepaid and postpaid calls out of *cost_details* table or query it's own one from rater in case of *pseudoprepaid* and *rated* CDRs.
Once the CDRs are mediated, can be exported as *.csv* format again via remote command offered by *cgr-console* tool:
::
cgr-console export_cdrs csv
.. _FreeSWITCH: http://www.freeswitch.org/
.. _Jitsi: http://www.jitsi.org/

View File

@@ -368,6 +368,11 @@ func (cd *CallDescriptor) roundTimeSpansToIncrement(timespans TimeSpans) []*Time
return timespans
}
// Returns call descripor's total duration
func (cd *CallDescriptor) GetDuration() time.Duration {
return cd.TimeEnd.Sub(cd.TimeStart)
}
/*
Creates a CallCost structure with the cost information calculated for the received CallDescriptor.
*/

View File

@@ -334,13 +334,15 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) {
break
}
}
if !destinationExists {
if dbExists, err := csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil {
var err error
if !destinationExists && csvr.dataStorage != nil {
if destinationExists, err = csvr.dataStorage.DataExists(DESTINATION_PREFIX, record[1]); err != nil {
return err
} else if !dbExists {
return fmt.Errorf("Could not get destination for tag %v", record[1])
}
}
if !destinationExists {
return fmt.Errorf("Could not get destination for tag %v", record[1])
}
dr := &utils.TPDestinationRate{
DestinationRateId: tag,
DestinationRates: []*utils.DestinationRate{
@@ -417,13 +419,14 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) {
csvr.ratingProfiles[key] = rp
}
_, exists := csvr.ratingPlans[record[5]]
if !exists {
if dbExists, err := csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil {
if !exists && csvr.dataStorage != nil {
if exists, err = csvr.dataStorage.DataExists(RATING_PLAN_PREFIX, record[5]); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5]))
}
}
if !exists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5]))
}
rpa := &RatingPlanActivation{
ActivationTime: at,
RatingPlanId: record[5],

View File

@@ -33,5 +33,8 @@ func NewMySQLStorage(host, port, name, user, password string) (Storage, error) {
if err != nil {
return nil, err
}
if err := db.Ping(); err != nil {
return nil, err
}
return &MySQLStorage{&SQLStorage{db}}, nil
}

View File

@@ -656,7 +656,7 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
}
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')",
_, err = self.Db.Exec(fmt.Sprintf("INSERT INTO %s (cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source, runid)VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %f, %f, '%s','%s','%s')",
utils.TBL_COST_DETAILS,
utils.FSCgrId(uuid),
uuid,
@@ -666,8 +666,8 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e
cc.Account,
cc.Subject,
cc.Destination,
cc.Cost,
cc.ConnectFee,
cc.Cost,
tss,
source,
runid))
@@ -678,12 +678,12 @@ func (self *SQLStorage) LogCallCost(uuid, source, runid string, cc *CallCost) (e
}
func (self *SQLStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, err error) {
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, cost, connect_fee, timespans, source FROM %s WHERE cgrid='%s' AND source='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid))
row := self.Db.QueryRow(fmt.Sprintf("SELECT cgrid, accid, direction, tenant, tor, account, subject, destination, connect_fee, cost, timespans, source FROM %s WHERE cgrid='%s' AND source='%s' AND runid='%s'", utils.TBL_COST_DETAILS, cgrid, source, runid))
var accid, src string
var timespansJson string
cc = &CallCost{Cost: -1}
err = row.Scan(&cgrid, &accid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Account, &cc.Subject,
&cc.Destination, &cc.Cost, &cc.ConnectFee, &timespansJson, &src)
&cc.Destination, &cc.ConnectFee, &cc.Cost, &timespansJson, &src)
if err = json.Unmarshal([]byte(timespansJson), &cc.Timespans); err != nil {
return nil, err
}

View File

@@ -27,9 +27,10 @@ set -e
case "$1" in
configure)
adduser --quiet --system --group --disabled-password --shell /bin/false --home /var/run/cgrates --gecos "CGRateS" cgrates || true
adduser --quiet --system --group --disabled-password --shell /bin/false --gecos "CGRateS" cgrates || true
chown -R cgrates:cgrates /var/log/cgrates/
chown -R cgrates:cgrates /usr/share/cgrates/
chown -R cgrates:cgrates /var/run/cgrates/
;;
abort-upgrade|abort-remove|abort-deconfigure)

View File

@@ -33,6 +33,7 @@ binary-arch: clean
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrc/out
mkdir -p $(PKGDIR)/var/log/cgrates/cdr/cdrexport/csv
mkdir -p $(PKGDIR)/var/log/cgrates/history
mkdir -p $(PKGDIR)/var/run/cgrates
dh_strip
dh_compress
dh_fixperms

View File

@@ -101,7 +101,7 @@ func (sm *FSSessionManager) GetSession(uuid string) *Session {
// Disconnects a session by sending hangup command to freeswitch
func (sm *FSSessionManager) DisconnectSession(s *Session, notify string) {
engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid))
// engine.Logger.Debug(fmt.Sprintf("Session: %+v", s.uuid))
err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", s.uuid, notify))
if err != nil {
engine.Logger.Err(fmt.Sprintf("could not send disconect api notification to freeswitch: %v", err))
@@ -123,6 +123,17 @@ func (sm *FSSessionManager) RemoveSession(s *Session) {
}
}
// Sets the call timeout valid of starting of the call
func (sm *FSSessionManager) setMaxCallDuration(uuid string, maxDur time.Duration) error {
err := fsock.FS.SendApiCmd(fmt.Sprintf("sched_hangup +%d %s\n\n", int(maxDur.Seconds()), uuid))
if err != nil {
engine.Logger.Err("could not send sched_hangup command to freeswitch")
return err
}
return nil
}
// Sends the transfer command to unpark the call to freeswitch
func (sm *FSSessionManager) unparkCall(uuid, call_dest_nb, notify string) {
err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_notify %s\n\n", uuid, notify))
@@ -140,15 +151,15 @@ func (sm *FSSessionManager) OnHeartBeat(ev Event) {
}
func (sm *FSSessionManager) OnChannelPark(ev Event) {
engine.Logger.Info("freeswitch park")
//engine.Logger.Info("freeswitch park")
startTime, err := ev.GetStartTime(PARK_TIME)
if err != nil {
engine.Logger.Err("Error parsing answer event start time, using time.Now!")
startTime = time.Now()
}
// if there is no account configured leave the call alone
if strings.TrimSpace(ev.GetReqType()) != utils.PREPAID {
return
if !utils.IsSliceMember([]string{utils.PREPAID, utils.PSEUDOPREPAID}, strings.TrimSpace(ev.GetReqType())) {
return // we unpark only prepaid and pseudoprepaid calls
}
if ev.MissingParameter() {
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), MISSING_PARAMETER)
@@ -162,8 +173,9 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
Subject: ev.GetSubject(),
Account: ev.GetAccount(),
Destination: ev.GetDestination(),
Amount: sm.debitPeriod.Seconds(),
TimeStart: startTime}
TimeStart: startTime,
TimeEnd: startTime.Add(cfg.SMMaxCallDuration),
}
var remainingDurationFloat float64
err = sm.connector.GetMaxSessionTime(cd, &remainingDurationFloat)
if err != nil {
@@ -172,17 +184,18 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
return
}
remainingDuration := time.Duration(remainingDurationFloat)
engine.Logger.Info(fmt.Sprintf("Remaining seconds: %v", remainingDuration))
//engine.Logger.Info(fmt.Sprintf("Remaining duration: %v", remainingDuration))
if remainingDuration == 0 {
engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
//engine.Logger.Info(fmt.Sprintf("Not enough credit for trasferring the call %s for %s.", ev.GetUUID(), cd.GetKey(cd.Subject)))
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), INSUFFICIENT_FUNDS)
return
}
sm.setMaxCallDuration(ev.GetUUID(), remainingDuration)
sm.unparkCall(ev.GetUUID(), ev.GetCallDestNr(), AUTH_OK)
}
func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
engine.Logger.Info("<SessionManager> FreeSWITCH answer.")
//engine.Logger.Info("<SessionManager> FreeSWITCH answer.")
// Make sure cgr_type is enforced even if not set by FreeSWITCH
if err := fsock.FS.SendApiCmd(fmt.Sprintf("uuid_setvar %s cgr_reqtype %s\n\n", ev.GetUUID(), ev.GetReqType())); err != nil {
engine.Logger.Err(fmt.Sprintf("Error on attempting to overwrite cgr_type in chan variables: %v", err))
@@ -194,7 +207,7 @@ func (sm *FSSessionManager) OnChannelAnswer(ev Event) {
}
func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
engine.Logger.Info("<SessionManager> FreeSWITCH hangup.")
//engine.Logger.Info("<SessionManager> FreeSWITCH hangup.")
s := sm.GetSession(ev.GetUUID())
if s == nil { // Not handled by us
return
@@ -246,7 +259,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
}
end := lastCC.Timespans[len(lastCC.Timespans)-1].TimeEnd
refundDuration := end.Sub(hangupTime)
initialRefundDuration := refundDuration
//initialRefundDuration := refundDuration
var refundIncrements engine.Increments
for i := len(lastCC.Timespans) - 1; i >= 0; i-- {
ts := lastCC.Timespans[i]
@@ -273,7 +286,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
}
}
// show only what was actualy refunded (stopped in timespan)
engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
// engine.Logger.Info(fmt.Sprintf("Refund duration: %v", initialRefundDuration-refundDuration))
if len(refundIncrements) > 0 {
cd := &engine.CallDescriptor{
Direction: lastCC.Direction,
@@ -293,23 +306,21 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
}
cost := refundIncrements.GetTotalCost()
lastCC.Cost -= cost
engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
// engine.Logger.Info(fmt.Sprintf("Rambursed %v cents", cost))
}
func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor, index float64) (cc *engine.CallCost) {
func (sm *FSSessionManager) LoopAction(s *Session, cd *engine.CallDescriptor) (cc *engine.CallCost) {
cc = &engine.CallCost{}
cd.LoopIndex = index
cd.CallDuration += sm.debitPeriod
err := sm.connector.MaxDebit(*cd, cc)
if err != nil {
engine.Logger.Err(fmt.Sprintf("Could not complete debit opperation: %v", err))
// disconnect session
s.sessionManager.DisconnectSession(s, SYSTEM_ERROR)
}
engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc))
// engine.Logger.Debug(fmt.Sprintf("Result of MaxDebit call: %v", cc))
if cc.GetDuration() == 0 || err != nil {
engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s))
// engine.Logger.Info(fmt.Sprintf("No credit left: Disconnect %v", s))
sm.DisconnectSession(s, INSUFFICIENT_FUNDS)
return
}

View File

@@ -77,6 +77,7 @@ func NewSession(ev Event, sm SessionManager) (s *Session) {
func (s *Session) startDebitLoop() {
nextCd := *s.callDescriptor
index := 0.0
debitPeriod := s.sessionManager.GetDebitPeriod()
for {
select {
case <-s.stopDebit:
@@ -86,9 +87,14 @@ func (s *Session) startDebitLoop() {
if index > 0 { // first time use the session start time
nextCd.TimeStart = nextCd.TimeEnd
}
nextCd.TimeEnd = nextCd.TimeStart.Add(s.sessionManager.GetDebitPeriod())
cc := s.sessionManager.LoopAction(s, &nextCd, index)
nextCd.TimeEnd = cc.GetEndTime()
nextCd.TimeEnd = nextCd.TimeStart.Add(debitPeriod)
nextCd.LoopIndex = index
nextCd.CallDuration += debitPeriod // first presumed duration
cc := s.sessionManager.LoopAction(s, &nextCd)
nextCd.TimeEnd = cc.GetEndTime() // set debited timeEnd
// update call duration with real debited duration
nextCd.CallDuration -= debitPeriod
nextCd.CallDuration += nextCd.GetDuration()
time.Sleep(cc.GetDuration())
index++
}
@@ -106,7 +112,7 @@ func (s *Session) getSessionDurationFrom(now time.Time) (d time.Duration) {
// Stops the debit loop
func (s *Session) Close(ev Event) {
engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid))
// engine.Logger.Debug(fmt.Sprintf("Stopping debit for %s", s.uuid))
if s == nil {
return
}
@@ -140,6 +146,6 @@ func (s *Session) SaveOperations() {
engine.Logger.Err("<SessionManager> Error: no connection to logger database, cannot save costs")
}
s.sessionManager.GetDbLogger().LogCallCost(s.uuid, engine.SESSION_MANAGER_SOURCE, utils.DEFAULT_RUNID, firstCC)
engine.Logger.Debug(fmt.Sprintf("<SessionManager> End of call, having costs: %v", firstCC.String()))
// engine.Logger.Debug(fmt.Sprintf("<SessionManager> End of call, having costs: %v", firstCC.String()))
}()
}

View File

@@ -19,16 +19,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"time"
)
type SessionManager interface {
Connect(*config.CGRConfig) error
DisconnectSession(*Session, string)
RemoveSession(*Session)
LoopAction(*Session, *engine.CallDescriptor, float64) *engine.CallCost
LoopAction(*Session, *engine.CallDescriptor) *engine.CallCost
GetDebitPeriod() time.Duration
GetDbLogger() engine.LogStorage
Shutdown() error

View File

@@ -1,7 +1,7 @@
package utils
const (
VERSION = "0.9.1rc3"
VERSION = "0.9.1c3"
POSTGRES = "postgres"
MYSQL = "mysql"
MONGO = "mongo"