Mediator refactoring to support CDR as imput instead of csv row - adding separation of names and indexes, StartTime->AnswerTime, adding reqtype=rated

This commit is contained in:
DanB
2013-06-06 12:37:36 +02:00
parent fbbb26fab3
commit a75c2e7324
27 changed files with 523 additions and 461 deletions

View File

@@ -38,7 +38,7 @@ func cdrHandler(w http.ResponseWriter, r *http.Request) {
if fsCdr, err := new(FSCdr).New(body); err == nil {
storage.SetCdr(fsCdr)
if cfg.CDRSMediator == "internal" {
errMedi := medi.MediateCdrFromDB(fsCdr, storage)
errMedi := medi.MediateDBCDR(fsCdr, storage)
if errMedi != nil {
rater.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error()))
}

View File

@@ -21,7 +21,6 @@ package cdrs
import (
"encoding/json"
"errors"
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/utils"
"strconv"
"time"
@@ -41,8 +40,8 @@ const (
CSTMID = "cgr_cstmid"
CALL_DEST_NR = "dialed_extension"
PARK_TIME = "start_epoch"
START_TIME = "answer_epoch"
END_TIME = "end_epoch"
ANSWER_TIME = "answer_epoch"
HANGUP_TIME = "end_epoch"
DURATION = "billsec"
USERNAME = "user_name"
FS_IP = "sip_local_network_addr"
@@ -50,7 +49,7 @@ const (
type FSCdr map[string]string
func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) {
func (fsCdr FSCdr) New(body []byte) (utils.CDR, error) {
fsCdr = make(map[string]string)
var tmp map[string]interface{}
var err error
@@ -95,35 +94,29 @@ func (fsCdr FSCdr) GetDestination() string {
return utils.FirstNonEmpty(fsCdr[DESTINATION], fsCdr[CALL_DEST_NR])
}
// Original dialed destination number, useful in case of unpark
func (fsCdr FSCdr) GetCallDestNr() string {
return fsCdr[CALL_DEST_NR]
}
func (fsCdr FSCdr) GetTOR() string {
return utils.FirstNonEmpty(fsCdr[TOR], cfg.DefaultTOR)
}
func (fsCdr FSCdr) GetUUID() string {
return fsCdr[UUID]
}
func (fsCdr FSCdr) GetTenant() string {
return utils.FirstNonEmpty(fsCdr[CSTMID], cfg.DefaultTenant)
}
func (fsCdr FSCdr) GetReqType() string {
return utils.FirstNonEmpty(fsCdr[REQTYPE], cfg.DefaultReqType)
}
func (fsCdr FSCdr) GetExtraParameters() string {
return "" // ToDo: Add and extract from config
func (fsCdr FSCdr) GetExtraFields() map[string]string {
return nil // ToDo: Add and extract from config
}
func (fsCdr FSCdr) GetFallbackSubj() string {
return cfg.DefaultSubject
}
func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[START_TIME], 0, 64)
func (fsCdr FSCdr) GetAnswerTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[ANSWER_TIME], 0, 64)
t = time.Unix(0, st*1000)
return
}
func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[END_TIME], 0, 64)
func (fsCdr FSCdr) GetHangupTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[HANGUP_TIME], 0, 64)
t = time.Unix(0, st*1000)
return
}
@@ -143,24 +136,22 @@ func (fsCdr FSCdr) Store() (result string, err error) {
result += fsCdr.GetSubject() + "|"
result += fsCdr.GetAccount() + "|"
result += fsCdr.GetDestination() + "|"
result += fsCdr.GetCallDestNr() + "|"
result += fsCdr.GetTOR() + "|"
result += fsCdr.GetUUID() + "|"
result += fsCdr.GetAccId() + "|"
result += fsCdr.GetTenant() + "|"
result += fsCdr.GetReqType() + "|"
st, err := fsCdr.GetStartTime()
st, err := fsCdr.GetAnswerTime()
if err != nil {
return "", err
}
result += strconv.FormatInt(st.UnixNano(), 10) + "|"
et, err := fsCdr.GetEndTime()
et, err := fsCdr.GetHangupTime()
if err != nil {
return "", err
}
result += strconv.FormatInt(et.UnixNano(), 10) + "|"
result += strconv.FormatInt(fsCdr.GetDuration(), 10) + "|"
result += fsCdr.GetFallbackSubj() + "|"
result += fsCdr.GetExtraParameters() + "|"
return
}

View File

@@ -29,11 +29,11 @@ import (
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"io"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"runtime"
"strconv"
"time"
@@ -50,7 +50,6 @@ const (
REDIS = "redis"
SAME = "same"
FS = "freeswitch"
FSCDR_FILE_CSV = "freeswitch_file_csv"
)
var (
@@ -130,19 +129,9 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) {
exitChan <- true
}
if cfg.MediatorCDRType == FSCDR_FILE_CSV { //Mediator as standalone service for file CDRs
if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir))
exitChan <- true
}
if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir))
exitChan <- true
}
medi.TrackCDRFiles(cfg.MediatorCDRInDir)
if cfg.MediatorCDRType == utils.FSCDR_FILE_CSV { //Mediator as standalone service for file CDRs
medi.TrackCDRFiles()
}
}
func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) {

View File

@@ -22,6 +22,7 @@ import (
"code.google.com/p/goconf/conf"
"errors"
"fmt"
"github.com/cgrates/cgrates/utils"
)
const (
@@ -67,7 +68,8 @@ type CGRConfig struct {
SchedulerEnabled bool
CDRSListen string // CDRS's listening interface: <x.y.z.y:1234>.
CDRSfsJSONEnabled bool // Enable the handler for FreeSWITCH JSON CDRs: <enabled|disabled>.
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSExtraFields []string //Extra fields to store in CDRs
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
@@ -79,15 +81,15 @@ type CGRConfig struct {
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
MediatorCDRType string // CDR type <freeswitch_http_json|freeswitch_file_csv>.
MediatorAccIdField string // Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
MediatorSubjectFields string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorReqTypeFields string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTimeStartFields string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTimeAnswerFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorCDRInDir string // Absolute path towards the directory where the CDRs are kept (file stored CDRs).
MediatorCDROutDir string // Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
FreeswitchServer string // freeswitch address host:port
@@ -122,21 +124,22 @@ func ( self *CGRConfig ) setDefaults() error {
self.CDRSListen = "127.0.0.1:2022"
self.CDRSfsJSONEnabled = false
self.CDRSMediator = INTERNAL
self.CDRSExtraFields = []string{}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
self.MediatorRaterReconnects = 3
self.MediatorCDRType = "freeswitch_http_json"
self.MediatorCDRType = utils.FSCDR_HTTP_JSON
self.MediatorAccIdField = "accid"
self.MediatorSubjectFields = "subject"
self.MediatorReqTypeFields = "reqtype"
self.MediatorDirectionFields = "direction"
self.MediatorTenantFields = "tenant"
self.MediatorTORFields = "tor"
self.MediatorAccountFields = "account"
self.MediatorDestFields = "destination"
self.MediatorTimeStartFields = "time_start"
self.MediatorDurationFields = "duration"
self.MediatorSubjectFields = []string{"subject"}
self.MediatorReqTypeFields = []string{"reqtype"}
self.MediatorDirectionFields = []string{"direction"}
self.MediatorTenantFields = []string{"tenant"}
self.MediatorTORFields = []string{"tor"}
self.MediatorAccountFields = []string{"account"}
self.MediatorDestFields = []string{"destination"}
self.MediatorTimeAnswerFields = []string{"time_answer"}
self.MediatorDurationFields = []string{"duration"}
self.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
self.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
self.SMEnabled = false
@@ -173,6 +176,7 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
cfg := &CGRConfig{}
cfg.setDefaults()
var hasOpt bool
var errParse error
if hasOpt = c.HasOption("global", "datadb_type"); hasOpt {
cfg.DataDBType, _ = c.GetString("global", "datadb_type")
}
@@ -251,6 +255,11 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("cdrs", "mediator"); hasOpt {
cfg.CDRSMediator, _ = c.GetString("cdrs", "mediator")
}
if hasOpt = c.HasOption("cdrs", "extra_fields"); hasOpt {
if cfg.CDRSExtraFields, errParse = ConfigSlice( c, "cdrs", "extra_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
}
@@ -270,31 +279,49 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
cfg.MediatorAccIdField, _ = c.GetString("mediator", "accid_field")
}
if hasOpt = c.HasOption("mediator", "subject_fields"); hasOpt {
cfg.MediatorSubjectFields, _ = c.GetString("mediator", "subject_fields")
if cfg.MediatorSubjectFields, errParse = ConfigSlice( c, "mediator", "subject_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "reqtype_fields"); hasOpt {
cfg.MediatorReqTypeFields, _ = c.GetString("mediator", "reqtype_fields")
if cfg.MediatorReqTypeFields, errParse = ConfigSlice( c, "mediator", "reqtype_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "direction_fields"); hasOpt {
cfg.MediatorDirectionFields, _ = c.GetString("mediator", "direction_fields")
if cfg.MediatorDirectionFields, errParse = ConfigSlice( c, "mediator", "direction_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "tenant_fields"); hasOpt {
cfg.MediatorTenantFields, _ = c.GetString("mediator", "tenant_fields")
if cfg.MediatorTenantFields, errParse = ConfigSlice( c, "mediator", "tenant_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "tor_fields"); hasOpt {
cfg.MediatorTORFields, _ = c.GetString("mediator", "tor_fields")
if cfg.MediatorTORFields, errParse = ConfigSlice( c, "mediator", "tor_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "account_fields"); hasOpt {
cfg.MediatorAccountFields, _ = c.GetString("mediator", "account_fields")
if cfg.MediatorAccountFields, errParse = ConfigSlice( c, "mediator", "account_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "destination_fields"); hasOpt {
cfg.MediatorDestFields, _ = c.GetString("mediator", "destination_fields")
if cfg.MediatorDestFields, errParse = ConfigSlice( c, "mediator", "destination_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "time_start_fields"); hasOpt {
cfg.MediatorTimeStartFields, _ = c.GetString("mediator", "time_start_fields")
if hasOpt = c.HasOption("mediator", "time_answer_fields"); hasOpt {
if cfg.MediatorTimeAnswerFields, errParse = ConfigSlice( c, "mediator", "time_answer_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "duration_fields"); hasOpt {
cfg.MediatorDurationFields, _ = c.GetString("mediator", "duration_fields")
if cfg.MediatorDurationFields, errParse = ConfigSlice( c, "mediator", "duration_fields"); errParse!=nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt {
cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir")

View File

@@ -58,22 +58,23 @@ func TestDefaults(t *testing.T) {
eCfg.SchedulerEnabled = false
eCfg.CDRSListen = "127.0.0.1:2022"
eCfg.CDRSfsJSONEnabled = false
eCfg.CDRSMediator = INTERNAL
eCfg.CDRSMediator = INTERNAL
eCfg.CDRSExtraFields = []string{}
eCfg.MediatorEnabled = false
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
eCfg.MediatorRaterReconnects = 3
eCfg.MediatorCDRType = "freeswitch_http_json"
eCfg.MediatorAccIdField = "accid"
eCfg.MediatorSubjectFields = "subject"
eCfg.MediatorReqTypeFields = "reqtype"
eCfg.MediatorDirectionFields = "direction"
eCfg.MediatorTenantFields = "tenant"
eCfg.MediatorTORFields = "tor"
eCfg.MediatorAccountFields = "account"
eCfg.MediatorDestFields = "destination"
eCfg.MediatorTimeStartFields = "time_start"
eCfg.MediatorDurationFields = "duration"
eCfg.MediatorSubjectFields = []string{"subject"}
eCfg.MediatorReqTypeFields = []string{"reqtype"}
eCfg.MediatorDirectionFields = []string{"direction"}
eCfg.MediatorTenantFields = []string{"tenant"}
eCfg.MediatorTORFields = []string{"tor"}
eCfg.MediatorAccountFields = []string{"account"}
eCfg.MediatorDestFields = []string{"destination"}
eCfg.MediatorTimeAnswerFields = []string{"time_answer"}
eCfg.MediatorDurationFields = []string{"duration"}
eCfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
eCfg.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
eCfg.SMEnabled = false
@@ -146,21 +147,22 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CDRSListen = "test"
eCfg.CDRSfsJSONEnabled = true
eCfg.CDRSMediator = "test"
eCfg.CDRSExtraFields = []string{"test"}
eCfg.MediatorEnabled = true
eCfg.MediatorListen = "test"
eCfg.MediatorRater = "test"
eCfg.MediatorRaterReconnects = 99
eCfg.MediatorCDRType = "test"
eCfg.MediatorAccIdField = "test"
eCfg.MediatorSubjectFields = "test"
eCfg.MediatorReqTypeFields = "test"
eCfg.MediatorDirectionFields = "test"
eCfg.MediatorTenantFields = "test"
eCfg.MediatorTORFields = "test"
eCfg.MediatorAccountFields = "test"
eCfg.MediatorDestFields = "test"
eCfg.MediatorTimeStartFields = "test"
eCfg.MediatorDurationFields = "test"
eCfg.MediatorSubjectFields = []string{"test"}
eCfg.MediatorReqTypeFields = []string{"test"}
eCfg.MediatorDirectionFields = []string{"test"}
eCfg.MediatorTenantFields = []string{"test"}
eCfg.MediatorTORFields = []string{"test"}
eCfg.MediatorAccountFields = []string{"test"}
eCfg.MediatorDestFields = []string{"test"}
eCfg.MediatorTimeAnswerFields = []string{"test"}
eCfg.MediatorDurationFields = []string{"test"}
eCfg.MediatorCDRInDir = "test"
eCfg.MediatorCDROutDir = "test"
eCfg.SMEnabled = true

45
config/helpers.go Normal file
View File

@@ -0,0 +1,45 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"code.google.com/p/goconf/conf"
"errors"
"strings"
)
// Adds support for slice values in config
func ConfigSlice( c *conf.ConfigFile, section, valName string ) ([]string, error) {
sliceStr, errGet := c.GetString(section, valName)
if errGet != nil {
return nil, errGet
}
cfgValStrs := strings.Split(sliceStr, ",") // If need arrises, we can make the separator configurable
if len(cfgValStrs)==1 && cfgValStrs[0]=="" { // Prevents returning iterable with empty value
return []string{}, nil
}
for _,elm := range cfgValStrs {
if elm == "" { //One empty element is presented when splitting empty string
return nil, errors.New("Empty values in config slice")
}
}
return cfgValStrs, nil
}

View File

@@ -1,71 +1,72 @@
# Test Data.
# NOT A REAL CONFIGURATION FILE
# TEST DATA - NOT FOR PRODUCTION USAGE
#
[global]
datadb_type = test # The main database: <redis>.
datadb_host = test # Database host address.
datadb_host = test # Database host address.
datadb_port = test # Port to reach the database.
datadb_name = test # The name of the database to connect to.
datadb_user = test # Username to use when connecting to database.
datadb_passwd = test # Password to use when connecting to database.
logdb_type = test # Log/stored database type to use: <same|postgres|mongo|redis>
logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets.
datadb_passwd = test # Password to use when connecting to database.
logdb_type = test # Log/stored database type to use: <same|postgres|mongo|redis>
logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets.
logdb_port = test # The port to reach the logdb.
logdb_name = test # The name of the log database to connect to.
logdb_user = test # Username to use when connecting to logdb.
logdb_user = test # Username to use when connecting to logdb.
logdb_passwd = test # Password to use when connecting to logdb.
rpc_encoding = test # RPC encoding used on APIs: <gob|json>.
default_reqtype = test # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
default_reqtype = test # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
default_tor = test # Default Type of Record to consider when missing from requests.
default_tenant = test # Default Tenant to consider when missing from requests.
default_subject = test # Default rating Subject to consider when missing from requests.
[balancer]
enabled = true # Start Balancer service: <true|false>.
listen = test # Balancer listen interface: <disabled|x.y.z.y:1234>.
enabled = true # Start Balancer service: <true|false>.
listen = test # Balancer listen interface: <disabled|x.y.z.y:1234>.
[rater]
enabled = true # Enable Rater service: <true|false>.
enabled = true # Enable Rater service: <true|false>.
balancer = test # Register to Balancer as worker: <enabled|disabled>.
listen = test # Rater's listening interface: <internal|x.y.z.y:1234>.
listen = test # Rater's listening interface: <internal|x.y.z.y:1234>.
[scheduler]
enabled = true # Starts Scheduler service: <true|false>.
enabled = true # Starts Scheduler service: <true|false>.
[cdrs]
listen = test # CDRS's listening interface: <x.y.z.y:1234>.
freeswitch_json_enabled = true # Enable the handler for FreeSWITCH JSON CDRs: <true|false>.
mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
listen=test # CDRS's listening interface: <x.y.z.y:1234>.
freeswitch_json_enabled=true # Enable the handler for FreeSWITCH JSON CDRs: <true|false>.
mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
extra_fields = test # Extra fields to store in CDRs
[mediator]
enabled = true # Starts Mediator service: <true|false>.
listen = test # Mediator's listening interface: <internal>.
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
enabled = true # Starts Mediator service: <true|false>.
listen=test # Mediator's listening interface: <internal>.
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
rater_reconnects = 99 # Number of reconnects to rater before giving up.
cdr_type = test # CDR type <freeswitch_http_json|freeswitch_file_csv>.
cdr_type = test # CDR type <freeswitch_http_json|freeswitch_file_csv>.
accid_field = test # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
reqtype_fields = test # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
direction_fields = test # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
tenant_fields = test # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
direction_fields = test # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
tenant_fields = test # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
tor_fields = test # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
time_start_fields = test # Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
time_answer_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.
switch_type = test # Defines the type of switch behind: <freeswitch>.
rater = test # Address where to reach the Rater.
enabled = true # Starts SessionManager service: <true|false>.
switch_type = test # Defines the type of switch behind: <freeswitch>.
rater = test # Address where to reach the Rater.
rater_reconnects = 99 # Number of reconnects to rater before giving up.
debit_interval = 99 # Interval to perform debits on.
[freeswitch]
server = test # Adress where to connect to FreeSWITCH socket.
passwd = test # FreeSWITCH socket password.
reconnects = 99 # Number of attempts on connect failure.
server = test # Adress where to connect to FreeSWITCH socket.
passwd = test # FreeSWITCH socket password.
reconnects = 99 # Number of attempts on connect failure.

View File

@@ -1,72 +0,0 @@
# CGRateS Configuration file
#
# This file contains the default configuration hardcoded into CGRateS.
# This is what you get when you load CGRateS with an empty configuration file.
# [global] must exist in all files, rest of the configuration is inter-changeable.
[global]
# datadb_type = redis # The main database: <redis>.
# datadb_host = 127.0.0.1 # Database host address.
# datadb_port = 6379 # Port to reach the database.
# datadb_name = 10 # The name of the database to connect to.
# datadb_user = # Username to use when connecting to database.
# datadb_passwd = # Password to use when connecting to database.
# logdb_type = mongo # Log/stored database type to use: <same|postgres|mongo|redis>
# logdb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
# logdb_port = 27017 # The port to reach the logdb.
# logdb_name = cgrates # The name of the log database to connect to.
# logdb_user = # Username to use when connecting to logdb.
# logdb_passwd = # Password to use when connecting to logdb.
[balancer]
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
# rpc_encoding = gob # RPC encoding used: <gob|json>.
[rater]
enabled = true # Enable Rater service: <true|false>.
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
# rpc_encoding = gob # RPC encoding used: <gob|json>.
[scheduler]
enabled = true # Starts Scheduler service: <true|false>.
[mediator]
enabled = true # Starts Mediator service: <true|false>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
# skipdb = false # Skips database checks for previous recorded prices: <true|false>.
# pseudoprepaid = false # Execute debits together with pricing: <true|false>.
# cdr_type = freeswitch_cdr # CDR type <freeswitch_cdr>.
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept.
# cdr_out_dir = /var/log/cgrates/cdr_out # Absolute path towards the directory where processed CDRs will be exported.
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
# rpc_encoding = gob # RPC encoding used when talking to Rater: <gob|json>.
default_reqtype = prepaid # Default request type to consider when missing from requests: <""|prepaid|postpaid>.
# default_tor = 0 # Default Type of Record to consider when missing from requests.
# default_tenant = 0 # Default Tenant to consider when missing from requests.
# default_subject = 0 # Default rating Subject to consider when missing from requests.
[freeswitch]
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
# passwd = ClueCon # FreeSWITCH socket password.
# reconnects = 5 # Number of attempts on connect failure.
# uuid_index = 10 # Index of the UUID info in the CDR file.
# direction_index = -1 # Index of the CallDirection info in the CDR file.
# tor_index = -1 # Index of the TypeOfRecord info in the CDR file.
# tenant_index = -1 # Index of the Tenant info in the CDR file.
# subject_index = -1 # Index of the Subject info in the CDR file. -1 to query database instead of rater
# account_index = -1 # Index of the Account info in the CDR file.
# destination_index = -1 # Index of the Destination info in the CDR file.
# time_start_index = -1 # Index of the TimeStart info in the CDR file.
# duration_index = -1 # Index of the CallDuration info in the CDR file.

View File

@@ -40,6 +40,7 @@
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
# freeswitch_json_enabled=false # Enable the handler for FreeSWITCH JSON CDRs: <true|false>.
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# extra_fields = # Extra fields to store in CDRs
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
@@ -49,13 +50,13 @@
# cdr_type = freeswitch_http_json # CDR type <freeswitch_http_json|freeswitch_file_csv>.
# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
# subject_fields = subject # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
# reqtype_fields = -1 # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_start_fields = time_start # Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).

View File

@@ -13,7 +13,7 @@ CREATE TABLE `cdrs_primary` (
`account` varchar(64) NOT NULL,
`subject` varchar(64) NOT NULL,
`destination` varchar(64) NOT NULL,
`time_start` datetime NOT NULL,
`time_answer` datetime NOT NULL,
`duration` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `cgrid` (`cgrid`)

121
mediator/fsfilecsvcdr.go Normal file
View File

@@ -0,0 +1,121 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package mediator
import (
"time"
"strconv"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
type FScsvCDR struct {
rowData []string // The original row extracted form csv file
accIdIdx,
subjectIdx,
reqtypeIdx,
directionIdx,
tenantIdx,
torIdx,
accountIdx,
destinationIdx,
answerTimeIdx,
durationIdx int // Field indexes
cgrCfg *config.CGRConfig // CGR Config instance
}
func NewFScsvCDR(cdrRow []string, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx, torIdx,
accountIdx, destinationIdx, answerTimeIdx, durationIdx int, cfg *config.CGRConfig ) (*FScsvCDR, error) {
fscdr := FScsvCDR{ cdrRow, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx,
torIdx, accountIdx, destinationIdx, answerTimeIdx, durationIdx, cfg }
return &fscdr, nil
}
func (self *FScsvCDR) GetCgrId() string {
return utils.FSCgrId(self.rowData[self.accIdIdx])
}
func (self *FScsvCDR) GetAccId() string {
return self.rowData[self.accIdIdx]
}
func (self *FScsvCDR) GetCdrHost() string {
return utils.LOCALHOST // ToDo: Maybe extract dynamically the external IP address here
}
func (self *FScsvCDR) GetDirection() string {
return "OUT"
}
func (self *FScsvCDR) GetOrigId() string {
return utils.NOT_IMPLEMENTED
}
func (self *FScsvCDR) GetSubject() string {
return self.rowData[self.subjectIdx]
}
func (self *FScsvCDR) GetAccount() string {
return self.rowData[self.accountIdx]
}
func (self *FScsvCDR) GetDestination() string {
return self.rowData[self.destinationIdx]
}
func (self *FScsvCDR) GetTOR() string {
return self.rowData[self.torIdx]
}
func (self *FScsvCDR) GetTenant() string {
return self.rowData[self.tenantIdx]
}
func (self *FScsvCDR) GetReqType() string {
if self.reqtypeIdx == -1 {
return self.cgrCfg.DefaultReqType
}
return self.rowData[self.reqtypeIdx]
}
func (self *FScsvCDR) GetAnswerTime() (time.Time, error) {
return time.Parse("2006-01-02 15:04:05", self.rowData[self.answerTimeIdx])
}
func (self *FScsvCDR) GetDuration() int64 {
dur, _ := strconv.ParseInt(self.rowData[self.durationIdx], 0, 64)
return dur
}
func (self *FScsvCDR) GetFallbackSubj() string {
return utils.NOT_IMPLEMENTED
}
func (self *FScsvCDR) GetExtraFields() map[string]string {
return nil
}

View File

@@ -19,11 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package mediator
import (
"errors"
"bufio"
"encoding/csv"
"flag"
"fmt"
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/howeyc/fsnotify"
"os"
"path"
@@ -32,107 +35,168 @@ import (
"time"
)
type mediatorFieldIdxs []int
// Extends goconf to provide us the slice with indexes we need for multiple mediation
func (mfi *mediatorFieldIdxs) Load(idxs string) error {
cfgStrIdxs := strings.Split(idxs, ",")
if len(cfgStrIdxs) == 0 {
return fmt.Errorf("Undefined %s", idxs)
func NewMediator( connector rater.Connector, storDb rater.DataStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
m = &Mediator{
connector: connector,
storDb: storDb,
cgrCfg: cfg,
}
for _, cfgStrIdx := range cfgStrIdxs {
if cfgIntIdx, errConv := strconv.Atoi(cfgStrIdx); errConv != nil || cfgStrIdx == "" {
return fmt.Errorf("All mediator index members (%s) must be ints", idxs)
} else {
*mfi = append(*mfi, cfgIntIdx)
}
m.fieldNames = make( map[string][]string )
m.fieldIdxs = make( map[string][]int )
// Load config fields
if errLoad := m.loadConfig(); errLoad != nil {
return nil, errLoad
}
return nil
return m, nil
}
type Mediator struct {
connector rater.Connector
loggerDb rater.DataStorage
outputDir string
directionIndexs,
torIndexs,
tenantIndexs,
subjectIndexs,
accountIndexs,
destinationIndexs,
timeStartIndexs,
durationIndexs,
uuidIndexs mediatorFieldIdxs
storDb rater.DataStorage
cgrCfg *config.CGRConfig
cdrInDir, cdrOutDir string
accIdField string
accIdIdx int // Populated only for csv files where we have no names but indexes for the fields
fieldNames map[string][]string
fieldIdxs map[string][]int // Populated only for csv files where we have no names but indexes for the fields
}
// Creates a new mediator object parsing the indexses
func NewMediator(connector rater.Connector,
loggerDb rater.DataStorage,
cfg *config.CGRConfig) (m *Mediator, err error) {
m = &Mediator{
connector: connector,
loggerDb: loggerDb,
outputDir: outputDir,
// Load configuration out of config fields and does the necessary checks
func (self *Mediator) loadConfig() error {
fieldKeys := []string{"subject", "reqtype", "direction", "tenant", "tor", "account", "destination", "time_start", "duration"}
cfgVals := [][]string{self.cgrCfg.MediatorSubjectFields, self.cgrCfg.MediatorReqTypeFields, self.cgrCfg.MediatorDirectionFields,
self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields,
self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields}
refIdx := 0 // Subject becomes reference for our checks
if len(cfgVals[refIdx]) == 0 {
return fmt.Errorf("Unconfigured %s fields", fieldKeys[refIdx])
}
idxs := []string{directionIndexs, torIndexs, tenantIndexs, subjectIndexs, accountIndexs,
destinationIndexs, timeStartIndexs, durationIndexs, uuidIndexs}
objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs,
&m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs}
for i, o := range objs {
err = o.Load(idxs[i])
if err != nil {
return
// All other configured fields must match the length of reference fields
for iCfgVal := range cfgVals {
if ( len(cfgVals[refIdx])!=len(cfgVals[iCfgVal]) ) {
// Make sure we have everywhere the length of reference key (subject)
return errors.New("Inconsistent lenght of mediator fields.")
}
}
if !m.validateIndexses() {
err = fmt.Errorf("All members must have the same length")
// AccIdField has no special requirements, should just exist
if self.cgrCfg.MediatorAccIdField == "" {
return errors.New("Undefined mediator accid field")
}
return
self.accIdField = self.cgrCfg.MediatorAccIdField
var errConv error
// Specific settings of CSV style CDRS
if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV {
// Check paths to be valid before adding as configuration
if _, err := os.Stat(self.cgrCfg.MediatorCDRInDir); err != nil {
return fmt.Errorf("The input path for mediator does not exist: %v", self.cgrCfg.MediatorCDRInDir)
} else {
self.cdrInDir = self.cgrCfg.MediatorCDRInDir
}
if _, err := os.Stat(self.cgrCfg.MediatorCDROutDir); err != nil {
return fmt.Errorf("The output path for mediator does not exist: %v", self.cgrCfg.MediatorCDROutDir)
} else {
self.cdrOutDir = self.cgrCfg.MediatorCDROutDir
}
if self.accIdIdx, errConv = strconv.Atoi(self.cgrCfg.MediatorAccIdField); errConv != nil {
return errors.New("AccIdIndex must be integer.")
}
}
// Load here field names and convert to integers in case of unamed cdrs like CSV
for idx, key := range fieldKeys {
self.fieldNames[key] = cfgVals[idx]
if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV { // Special case when field names represent indexes of their location in file
self.fieldIdxs[key] = make( []int, len(cfgVals[idx]) )
for iStr, cfgStr := range cfgVals[idx] {
if self.fieldIdxs[key][iStr], errConv = strconv.Atoi(cfgStr); errConv != nil {
return fmt.Errorf("All mediator index members (%s) must be ints", key)
}
}
}
}
return nil
}
// Make sure all indexes are having same lenght
func (m *Mediator) validateIndexses() bool {
refLen := len(m.subjectIndexs)
for _, fldIdxs := range []mediatorFieldIdxs{m.directionIndexs, m.torIndexs, m.tenantIndexs,
m.accountIndexs, m.destinationIndexs, m.timeStartIndexs, m.durationIndexs, m.uuidIndexs} {
if len(fldIdxs) != refLen {
return false
}
}
return true
}
// Watch the specified folder for file moves and parse the files on events
func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) {
func (self *Mediator) TrackCDRFiles() (err error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return
}
defer watcher.Close()
err = watcher.Watch(cdrPath)
err = watcher.Watch(self.cdrInDir)
if err != nil {
return
}
rater.Logger.Info(fmt.Sprintf("Monitoring %v for file moves.", cdrPath))
rater.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cdrInDir))
for {
select {
case ev := <-watcher.Event:
if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
rater.Logger.Info(fmt.Sprintf("Parsing: %v", ev.Name))
err = m.parseCSV(ev.Name)
rater.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
err = self.MediateCSVCDR(ev.Name)
if err != nil {
return err
}
}
case err := <-watcher.Error:
rater.Logger.Err(fmt.Sprintf("Inotify error: %v", err))
rater.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
}
}
return
}
// Retrive the cost from logging database
func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *rater.CallCost, err error) {
return self.storDb.GetCallCostLog(cdr.GetCgrId(), rater.SESSION_MANAGER_SOURCE)
}
// Retrive the cost from rater
func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*rater.CallCost, error) {
cc := &rater.CallCost{}
d, err := time.ParseDuration(strconv.FormatInt(cdr.GetDuration(),10) + "s")
if err != nil {
return nil, err
}
if d.Seconds() == 0 { // failed call, returning empty callcost, no error
return cc, nil
}
t1, err := cdr.GetAnswerTime()
if err != nil {
return nil, err
}
cd := rater.CallDescriptor{
Direction: "OUT", //record[m.directionFields[runIdx]] TODO: fix me
Tenant: cdr.GetTenant(),
TOR: cdr.GetTOR(),
Subject: cdr.GetSubject(),
Account: cdr.GetAccount(),
Destination: cdr.GetDestination(),
TimeStart: t1,
TimeEnd: t1.Add(d)}
if cdr.GetReqType()== utils.PSEUDOPREPAID {
err = self.connector.Debit(cd, cc)
} else {
err = self.connector.GetCost(cd, cc)
}
if err != nil {
self.storDb.LogError(cdr.GetCgrId(), rater.MEDIATOR_SOURCE, err.Error())
} else {
// If the mediator calculated a price it will write it to logdb
self.storDb.LogCallCost(cdr.GetCgrId(), rater.MEDIATOR_SOURCE, cc)
}
return cc, nil
}
// Parse the files and get cost for every record
func (m *Mediator) parseCSV(cdrfn string) (err error) {
func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) {
flag.Parse()
file, err := os.Open(cdrfn)
defer file.Close()
@@ -143,7 +207,7 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
csvReader := csv.NewReader(bufio.NewReader(file))
_, fn := path.Split(cdrfn)
fout, err := os.Create(path.Join(m.outputDir, fn))
fout, err := os.Create(path.Join(self.cdrOutDir, fn))
if err != nil {
return err
}
@@ -153,18 +217,36 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
//t, _ := time.Parse("2006-01-02 15:04:05", record[5])
var cc *rater.CallCost
for runIdx, idxVal := range m.subjectIndexs { // Query costs for every run index given by subject
if idxVal == -1 { // -1 as subject means use database to get previous set price
cc, err = m.getCostsFromDB(record, runIdx)
for runIdx := range self.fieldIdxs["subject"] { // Query costs for every run index given by subject
csvCDR, errCDR := NewFScsvCDR( record, self.accIdIdx,
self.fieldIdxs["subject"][runIdx],
self.fieldIdxs["reqtype"][runIdx],
self.fieldIdxs["direction"][runIdx],
self.fieldIdxs["tenant"][runIdx],
self.fieldIdxs["tor"][runIdx],
self.fieldIdxs["account"][runIdx],
self.fieldIdxs["destination"][runIdx],
self.fieldIdxs["answer_time"][runIdx],
self.fieldIdxs["duration"][runIdx],
self.cgrCfg)
if errCDR != nil {
rater.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for accid: <%s>, err: <%s>",
record[self.accIdIdx], errCDR.Error()))
}
var errCost error
if (csvCDR.GetReqType()==utils.PREPAID || csvCDR.GetReqType()==utils.POSTPAID){
// Should be previously calculated and stored in DB
cc, errCost = self.getCostsFromDB( csvCDR )
} else {
cc, err = m.getCostsFromRater(record, runIdx)
cc, errCost = self.getCostsFromRater( csvCDR )
}
cost := "-1"
if err != nil || cc == nil {
rater.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for uuid: <%s>, err: <%s>, cost: <%v>", record[m.uuidIndexs[runIdx]], err.Error(), cc))
if errCost != nil || cc == nil {
rater.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for accid: <%s>, err: <%s>, cost: <%v>", csvCDR.GetAccId(), err.Error(), cc))
} else {
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
rater.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, cost: %v", record[m.uuidIndexs[runIdx]], cost))
rater.Logger.Debug(fmt.Sprintf("Calculated for accid:%s, cost: %v", csvCDR.GetAccId(), cost))
}
record = append(record, cost)
}
@@ -174,75 +256,23 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
return
}
// Retrive the cost from logging database
func (m *Mediator) getCostsFromDB(record []string, runIdx int) (cc *rater.CallCost, err error) {
searchedUUID := record[m.uuidIndexs[runIdx]]
cc, err = m.loggerDb.GetCallCostLog(searchedUUID, rater.SESSION_MANAGER_SOURCE)
return
}
// Retrive the cost from rater
func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.CallCost, err error) {
d, err := time.ParseDuration(record[m.durationIndexs[runIdx]] + "s")
if err != nil {
return
}
cc = &rater.CallCost{}
if d.Seconds() == 0 { // failed call, returning empty callcost, no error
return cc, nil
}
t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndexs[runIdx]])
if err != nil {
return
}
cd := rater.CallDescriptor{
Direction: "OUT", //record[m.directionIndexs[runIdx]] TODO: fix me
Tenant: record[m.tenantIndexs[runIdx]],
TOR: record[m.torIndexs[runIdx]],
Subject: record[m.subjectIndexs[runIdx]],
Account: record[m.accountIndexs[runIdx]],
Destination: record[m.destinationIndexs[runIdx]],
TimeStart: t1,
TimeEnd: t1.Add(d)}
if cfg.DefaultReqType == PSEUDOPREPAID { //ToDo: Implement here dynamically getting the tor out of record instance
err = m.connector.Debit(cd, cc)
func (self *Mediator) MediateDBCDR(cdr utils.CDR, db rater.DataStorage) error {
var cc *rater.CallCost
var errCost error
if (cdr.GetReqType()==utils.PREPAID || cdr.GetReqType()==utils.POSTPAID){
// Should be previously calculated and stored in DB
cc, errCost = self.getCostsFromDB( cdr )
} else {
err = m.connector.GetCost(cd, cc)
cc, errCost = self.getCostsFromRater( cdr )
}
if err != nil {
m.loggerDb.LogError(record[m.uuidIndexs[runIdx]], rater.MEDIATOR_SOURCE, err.Error())
cost := "-1"
if errCost != nil || cc == nil {
rater.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for cgrid: <%s>, err: <%s>, cost: <%v>", cdr.GetCgrId(), errCost.Error(), cc))
} else {
// If the mediator calculated a price it will write it to logdb
m.loggerDb.LogCallCost(record[m.uuidIndexs[runIdx]], rater.MEDIATOR_SOURCE, cc)
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
rater.Logger.Debug(fmt.Sprintf("<Mediator> Calculated for cgrid:%s, cost: %v", cdr.GetCgrId(), cost))
}
return
return self.storDb.SetRatedCdr(cdr, cc)
}
/* Calculates price for the specified cdr and writes the new cdr with price to
the storage. If the cdr is nil then it will fetch it from the storage. */
func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error {
cc := &rater.CallCost{}
startTime, err := cdr.GetStartTime()
if err != nil {
return err
}
endTime, err := cdr.GetEndTime()
if err != nil {
return err
}
cd := rater.CallDescriptor{
Direction: cdr.GetDirection(),
Tenant: cdr.GetTenant(),
TOR: cdr.GetTOR(),
Subject: cdr.GetSubject(),
Account: cdr.GetAccount(),
Destination: cdr.GetDestination(),
TimeStart: startTime,
TimeEnd: endTime}
if err := m.connector.GetCost(cd, cc); err != nil {
fmt.Println("Got error in the mediator getCost", err.Error())
return err
}
return db.SetRatedCdr(cdr, cc)
}

View File

@@ -1,45 +1,10 @@
package mediator
import (
"testing"
//"testing"
)
func TestIndexLoad(t *testing.T) {
idxs := make(mediatorFieldIdxs, 0)
err := idxs.Load("1,2,3")
if err != nil && len(idxs) != 3 {
t.Error("Error parsing indexses ", err)
}
}
func TestIndexLoadError(t *testing.T) {
idxs := make(mediatorFieldIdxs, 0)
err := idxs.Load("1,a,2")
if err == nil {
t.Error("Error parsing indexses ", err)
}
}
func TestIndexLoadEmpty(t *testing.T) {
idxs := make(mediatorFieldIdxs, 0)
err := idxs.Load("")
if err == nil {
t.Error("Error parsing indexses ", err)
}
}
func TestIndexLengthSame(t *testing.T) {
m := new(Mediator)
objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs,
&m.accountIndexs, &m.destinationIndexs, &m.timeStartIndexs, &m.durationIndexs, &m.uuidIndexs}
for _, o := range objs {
o.Load("1,2,3")
}
if !m.validateIndexses() {
t.Error("Error checking length")
}
}
/*
func TestIndexLengthDifferent(t *testing.T) {
m := new(Mediator)
objs := []*mediatorFieldIdxs{&m.directionIndexs, &m.torIndexs, &m.tenantIndexs, &m.subjectIndexs,
@@ -63,3 +28,4 @@ func TestLoad(t *testing.T) {
t.Errorf("Expected %v was %v", 3, len(m.directionIndexs))
}
}
*/

View File

@@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
type GosexyStorage struct {
@@ -217,10 +218,10 @@ func (rs *GosexyStorage) LogError(uuid, source, errstr string) (err error) {
return
}
func (rs *GosexyStorage) SetCdr(CDR) error {
func (rs *GosexyStorage) SetCdr(utils.CDR) error {
return nil
}
func (rs *GosexyStorage) SetRatedCdr(CDR, *CallCost) error {
func (rs *GosexyStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/ugorji/go/codec"
"github.com/vmihailenco/msgpack"
"labix.org/v2/mgo/bson"
"github.com/cgrates/cgrates/utils"
)
const (
@@ -63,8 +64,8 @@ type DataStorage interface {
GetActionTimings(string) (ActionTimings, error)
SetActionTimings(string, ActionTimings) error
GetAllActionTimings() (map[string]ActionTimings, error)
SetCdr(CDR) error
SetRatedCdr(CDR, *CallCost) error
SetCdr(utils.CDR) error
SetRatedCdr(utils.CDR, *CallCost) error
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
LogCallCost(uuid, source string, cc *CallCost) error
LogError(uuid, source, errstr string) error

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
type MapStorage struct {
@@ -183,10 +184,10 @@ func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) {
return nil
}
func (ms *MapStorage) SetCdr(CDR) error {
func (ms *MapStorage) SetCdr(utils.CDR) error {
return nil
}
func (ms *MapStorage) SetRatedCdr(CDR, *CallCost) error {
func (ms *MapStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}

View File

@@ -24,6 +24,7 @@ import (
"labix.org/v2/mgo/bson"
"log"
"time"
"github.com/cgrates/cgrates/utils"
)
type MongoStorage struct {
@@ -208,10 +209,10 @@ func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) {
return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source})
}
func (ms *MongoStorage) SetCdr(CDR) error {
func (ms *MongoStorage) SetCdr(utils.CDR) error {
return nil
}
func (ms *MongoStorage) SetRatedCdr(CDR, *CallCost) error {
func (ms *MongoStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}

View File

@@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/cgrates/cgrates/utils"
)
type MySQLStorage struct {
@@ -122,8 +123,8 @@ func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as Act
}
func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return }
func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) {
startTime, err := cdr.GetStartTime()
func (mys *MySQLStorage) SetCdr(cdr utils.CDR) (err error) {
startTime, err := cdr.GetAnswerTime()
if err != nil {
return err
}
@@ -146,7 +147,7 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) {
}
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')",
cdr.GetCgrId(),
cdr.GetExtraParameters(),
cdr.GetExtraFields(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
@@ -155,7 +156,7 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) {
return
}
func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) {
func (mys *MySQLStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) {
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')",
cdr.GetCgrId(),
cc.Cost,

View File

@@ -22,6 +22,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/cgrates/cgrates/utils"
_ "github.com/bmizerany/pq"
)
@@ -122,8 +123,8 @@ func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as
}
func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return }
func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) {
startTime, err := cdr.GetStartTime()
func (psl *PostgresStorage) SetCdr(cdr utils.CDR) (err error) {
startTime, err := cdr.GetAnswerTime()
if err != nil {
return err
}
@@ -146,7 +147,7 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) {
}
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')",
cdr.GetCgrId(),
cdr.GetExtraParameters(),
cdr.GetExtraFields(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
@@ -155,7 +156,7 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) {
return
}
func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) {
func (psl *PostgresStorage) SetRatedCdr(cdr utils.CDR, cc *CallCost) (err error) {
if err != nil {
return err
}

View File

@@ -23,6 +23,7 @@ import (
"github.com/garyburd/redigo/redis"
//"log"
"time"
"github.com/cgrates/cgrates/utils"
)
type RedigoStorage struct {
@@ -215,10 +216,10 @@ func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) {
return
}
func (rs *RedigoStorage) SetCdr(CDR) error {
func (rs *RedigoStorage) SetCdr(utils.CDR) error {
return nil
}
func (rs *RedigoStorage) SetRatedCdr(CDR, *CallCost) error {
func (rs *RedigoStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}

View File

@@ -23,6 +23,7 @@ import (
"github.com/fzzy/radix/redis"
//"log"
"time"
"github.com/cgrates/cgrates/utils"
)
type RedisStorage struct {
@@ -242,10 +243,10 @@ func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) {
return
}
func (rs *RedisStorage) SetCdr(CDR) error {
func (rs *RedisStorage) SetCdr(utils.CDR) error {
return nil
}
func (rs *RedisStorage) SetRatedCdr(CDR, *CallCost) error {
func (rs *RedisStorage) SetRatedCdr(utils.CDR, *CallCost) error {
return nil
}

View File

@@ -50,8 +50,6 @@ const (
ANSWER = "CHANNEL_ANSWER"
HANGUP = "CHANNEL_HANGUP_COMPLETE"
PARK = "CHANNEL_PARK"
REQTYPE_PREPAID = "prepaid"
REQTYPE_POSTPAID = "postpaid"
AUTH_OK = "+AUTH_OK"
DISCONNECT = "+SWITCH DISCONNECT"
INSUFFICIENT_FUNDS = "-INSUFFICIENT_FUNDS"

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"log/syslog"
"net"
@@ -145,7 +146,7 @@ func (sm *FSSessionManager) OnChannelPark(ev Event) {
startTime = time.Now()
}
// if there is no account configured leave the call alone
if strings.TrimSpace(ev.GetReqType()) != REQTYPE_PREPAID {
if strings.TrimSpace(ev.GetReqType()) != utils.PREPAID {
return
}
if ev.MissingParameter() {
@@ -194,7 +195,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
return
}
defer s.Close() // Stop loop and save the costs deducted so far to database
if ev.GetReqType() == REQTYPE_POSTPAID {
if ev.GetReqType() == utils.POSTPAID {
startTime, err := ev.GetStartTime(START_TIME)
if err != nil {
rater.Logger.Crit("Error parsing postpaid call start time from event")

View File

@@ -21,8 +21,8 @@ package sessionmanager
import (
"fmt"
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/fsock"
"strings"
"time"
)
@@ -38,8 +38,8 @@ type Session struct {
// Creates a new session and starts the debit loop
func NewSession(ev Event, sm SessionManager) (s *Session) {
// Ignore calls which have nothing to do with CGRateS
if strings.TrimSpace(ev.GetReqType()) == "" {
// SesionManager only handles prepaid and postpaid calls
if ev.GetReqType() != utils.PREPAID && ev.GetReqType() != utils.POSTPAID {
return
}
// Make sure cgr_type is enforced even if not set by FreeSWITCH
@@ -68,9 +68,9 @@ func NewSession(ev Event, sm SessionManager) (s *Session) {
sm.DisconnectSession(s, MISSING_PARAMETER)
} else {
switch ev.GetReqType() {
case REQTYPE_PREPAID:
case utils.PREPAID:
go s.startDebitLoop()
case REQTYPE_POSTPAID:
case utils.POSTPAID:
// do not loop, make only one debit at hangup
}
}

View File

@@ -21,7 +21,6 @@ package sessionmanager
import (
"github.com/cgrates/cgrates/config"
"testing"
// "time"
)
var (
@@ -55,61 +54,7 @@ var (
### Test data, not for production usage
[global]
datadb_type = test #
datadb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets.
datadb_port = test # The port to bind to.
datadb_name = test # The name of the database to connect to.
datadb_user = test # The user to sign in as.
datadb_passwd = test # The user's password.root
logdb_type = test #
logdb_host = test # The host to connect to. Values that start with / are for UNIX domain sockets.
logdb_port = test # The port to bind to.
logdb_name = test # The name of the database to connect to.
logdb_user = test # The user to sign in as.
logdb_passwd = test # The user's password.root
[balancer]
enabled = true # Start balancer server
listen = test # Balancer listen interface
rpc_encoding = test # use JSON for RPC encoding
[rater]
enabled = true
listen = test # listening address host:port, internal for internal communication only
balancer = test # if defined it will register to balancer as worker
rpc_encoding = test # use JSON for RPC encoding
[mediator]
enabled = true
cdr_in_dir = test # Freeswitch Master CSV CDR path.
cdr_out_dir = test
rater = test #address where to access rater. Can be internal, direct rater address or the address of a balancer
rpc_encoding = test # use JSON for RPC encoding
skipdb = true
pseudoprepaid = true
[scheduler]
enabled = true
[session_manager]
enabled = true
switch_type = test
rater = test #address where to access rater. Can be internal, direct rater address or the address of a balancer
debit_interval = 11
rpc_encoding = test # use JSON for RPC encoding
[freeswitch]
server = test # freeswitch address host:port
passwd = test # freeswitch address host:port
direction_index = test
tor_index = test
tenant_index = test
subject_index = test
account_index = test
destination_index = test
time_start_index = test
duration_index = test
uuid_index = test
default_reqtype=
`)
)

View File

@@ -16,14 +16,13 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package rater
package utils
import (
"time"
)
type CDR interface {
New([]byte) (CDR, error)
GetCgrId() string
GetAccId() string
GetCdrHost() string
@@ -32,14 +31,11 @@ type CDR interface {
GetSubject() string
GetAccount() string
GetDestination() string
GetCallDestNr() string
GetTOR() string
GetUUID() string
GetTenant() string
GetReqType() string
GetStartTime() (time.Time, error)
GetEndTime() (time.Time, error)
GetAnswerTime() (time.Time, error)
GetDuration() int64
GetFallbackSubj() string
GetExtraParameters() string
GetExtraFields() map[string]string //Stores extra CDR Fields
}

13
utils/consts.go Normal file
View File

@@ -0,0 +1,13 @@
package utils
const (
LOCALHOST = "127.0.0.1"
FSCDR_FILE_CSV = "freeswitch_file_csv"
FSCDR_HTTP_JSON = "freeswitch_http_json"
NOT_IMPLEMENTED = "not implemented"
PREPAID = "prepaid"
POSTPAID = "postpaid"
PSEUDOPREPAID = "pseudoprepaid"
RATED = "rated"
)