mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 21:29:52 +05:00
Merge branch 'OsipsSM'
This commit is contained in:
@@ -1442,6 +1442,30 @@ func TestLocalGetCdrs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalProcessCdr(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
var reply string
|
||||
cdr := utils.StoredCdr{CgrId: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), OrderId: 123, TOR: utils.VOICE, AccId: "dsafdsaf",
|
||||
CdrHost: "192.168.1.1", CdrSource: "test", ReqType: "rated", Direction: "*out", Tenant: "cgrates.org", Category: "call", Account: "1001", Subject: "1001", Destination: "1002",
|
||||
SetupTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), MediationRunId: utils.DEFAULT_RUNID,
|
||||
Usage: time.Duration(10) * time.Second, ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, RatedAccount: "dan", RatedSubject: "dans",
|
||||
}
|
||||
if err := rater.Call("CDRSV1.ProcessCdr", cdr, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply received: ", reply)
|
||||
}
|
||||
var cdrs []*utils.StoredCdr
|
||||
req := utils.AttrGetCdrs{}
|
||||
if err := rater.Call("ApierV1.GetCdrs", req, &cdrs); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(cdrs) != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalSetDC(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) 2012-2014 ITsysCOM GmbH
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can Storagetribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@@ -16,4 +16,26 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package mediator
|
||||
package apier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Receive CDRs via RPC methods
|
||||
type CDRSV1 struct {
|
||||
CdrSrv *engine.CDRS
|
||||
}
|
||||
|
||||
func (cdrsrv *CDRSV1) ProcessCdr(cdr *utils.StoredCdr, reply *string) error {
|
||||
if cdrsrv.CdrSrv == nil {
|
||||
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, "CDRS_NOT_RUNNING")
|
||||
}
|
||||
if err := cdrsrv.CdrSrv.ProcessCdr(cdr); err != nil {
|
||||
return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error())
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
@@ -20,13 +20,13 @@ package apier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/mediator"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MediatorV1 struct {
|
||||
Medi *mediator.Mediator
|
||||
Medi *engine.Mediator
|
||||
}
|
||||
|
||||
// Remotely start mediation with specific runid, runs asynchronously, it's status will be displayed in syslog
|
||||
|
||||
@@ -32,7 +32,6 @@ import (
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/cgrates/cgrates/cdrs"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/howeyc/fsnotify"
|
||||
@@ -43,7 +42,7 @@ const (
|
||||
FS_CSV = "freeswitch_csv"
|
||||
)
|
||||
|
||||
func NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, csvSep string, cdrFields map[string][]*utils.RSRField, cdrServer *cdrs.CDRS) (*Cdrc, error) {
|
||||
func NewCdrc(cdrsAddress, cdrType, cdrInDir, cdrOutDir, cdrSourceId string, runDelay time.Duration, csvSep string, cdrFields map[string][]*utils.RSRField, cdrServer *engine.CDRS) (*Cdrc, error) {
|
||||
if len(csvSep) != 1 {
|
||||
return nil, fmt.Errorf("Unsupported csv separator: %s", csvSep)
|
||||
}
|
||||
@@ -69,7 +68,7 @@ type Cdrc struct {
|
||||
runDelay time.Duration
|
||||
csvSep rune
|
||||
cdrFields map[string][]*utils.RSRField
|
||||
cdrServer *cdrs.CDRS // Reference towards internal cdrServer if that is the case
|
||||
cdrServer *engine.CDRS // Reference towards internal cdrServer if that is the case
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
@@ -220,7 +219,7 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
continue
|
||||
}
|
||||
if self.cdrsAddress == utils.INTERNAL {
|
||||
if err := self.cdrServer.ProcessRawCdr(storedCdr); err != nil {
|
||||
if err := self.cdrServer.ProcessCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, row: %d, error: %s", procRowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -22,8 +22,8 @@ import (
|
||||
//"bytes"
|
||||
//"encoding/csv"
|
||||
//"fmt"
|
||||
"github.com/cgrates/cgrates/cdrs"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
//"io"
|
||||
"reflect"
|
||||
@@ -37,7 +37,7 @@ func TestRecordForkCdr(t *testing.T) {
|
||||
cgrConfig.CdrcCdrFields["supplier"] = []*utils.RSRField{&utils.RSRField{Id: "14"}}
|
||||
csvSepRune, _ := utf8.DecodeRune([]byte(cgrConfig.CdrcCsvSep))
|
||||
cdrc := &Cdrc{cgrConfig.CdrcCdrs, cgrConfig.CdrcCdrType, cgrConfig.CdrcCdrInDir, cgrConfig.CdrcCdrOutDir, cgrConfig.CdrcSourceId, cgrConfig.CdrcRunDelay, csvSepRune,
|
||||
cgrConfig.CdrcCdrFields, new(cdrs.CDRS), nil}
|
||||
cgrConfig.CdrcCdrFields, new(engine.CDRS), nil}
|
||||
cdrRow := []string{"firstField", "secondField"}
|
||||
_, err := cdrc.recordToStoredCdr(cdrRow)
|
||||
if err == nil {
|
||||
|
||||
@@ -31,11 +31,9 @@ import (
|
||||
"github.com/cgrates/cgrates/apier"
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/cdrc"
|
||||
"github.com/cgrates/cgrates/cdrs"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/history"
|
||||
"github.com/cgrates/cgrates/mediator"
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -51,6 +49,7 @@ const (
|
||||
REDIS = "redis"
|
||||
SAME = "same"
|
||||
FS = "freeswitch"
|
||||
OSIPS = "opensips"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -66,9 +65,9 @@ var (
|
||||
exitChan = make(chan bool)
|
||||
server = &engine.Server{}
|
||||
scribeServer history.Scribe
|
||||
cdrServer *cdrs.CDRS
|
||||
cdrServer *engine.CDRS
|
||||
sm sessionmanager.SessionManager
|
||||
medi *mediator.Mediator
|
||||
medi *engine.Mediator
|
||||
cfg *config.CGRConfig
|
||||
err error
|
||||
)
|
||||
@@ -96,8 +95,8 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
var client *rpcclient.RpcClient
|
||||
var err error
|
||||
|
||||
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.MediatorRater, 0, cfg.MediatorRaterReconnects, utils.GOB)
|
||||
for i := 0; i < cfg.MediatorReconnects; i++ {
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.MediatorRater, 0, cfg.MediatorReconnects, utils.GOB)
|
||||
if err == nil { //Connected so no need to reiterate
|
||||
break
|
||||
}
|
||||
@@ -111,7 +110,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
connector = &engine.RPCClientConnector{Client: client}
|
||||
}
|
||||
var err error
|
||||
medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cfg)
|
||||
medi, err = engine.NewMediator(connector, loggerDb, cdrDb, cfg)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
|
||||
exitChan <- true
|
||||
@@ -149,8 +148,8 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
|
||||
var client *rpcclient.RpcClient
|
||||
var err error
|
||||
|
||||
for i := 0; i < cfg.SMRaterReconnects; i++ {
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMRaterReconnects, utils.GOB)
|
||||
for i := 0; i < cfg.SMReconnects; i++ {
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMReconnects, utils.GOB)
|
||||
if err == nil { //Connected so no need to reiterate
|
||||
break
|
||||
}
|
||||
@@ -165,14 +164,15 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
|
||||
switch cfg.SMSwitchType {
|
||||
case FS:
|
||||
dp, _ := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval))
|
||||
sm = sessionmanager.NewFSSessionManager(loggerDb, connector, dp)
|
||||
errConn := sm.Connect(cfg)
|
||||
if errConn != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", errConn))
|
||||
}
|
||||
sm = sessionmanager.NewFSSessionManager(cfg, loggerDb, connector, dp)
|
||||
case OSIPS:
|
||||
sm, _ = sessionmanager.NewOSipsSessionManager(cfg, connector)
|
||||
default:
|
||||
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))
|
||||
}
|
||||
if err = sm.Connect(); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
@@ -185,8 +185,11 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, d
|
||||
return
|
||||
}
|
||||
}
|
||||
cdrServer = cdrs.New(cdrDb, medi, cfg)
|
||||
cdrServer = engine.NewCdrS(cdrDb, medi, cfg)
|
||||
cdrServer.RegisterHanlersToServer(server)
|
||||
engine.Logger.Info("Registering CDRS RPC service.")
|
||||
server.RpcRegister(&apier.CDRSV1{CdrSrv: cdrServer})
|
||||
responder.CdrSrv = cdrServer // Make the cdrserver available for internal communication
|
||||
close(doneChan)
|
||||
}
|
||||
|
||||
|
||||
@@ -105,18 +105,23 @@ type CGRConfig struct {
|
||||
SMEnabled bool
|
||||
SMSwitchType string
|
||||
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
|
||||
SMRaterReconnects int // Number of reconnect attempts to rater
|
||||
SMReconnects int // Number of reconnect attempts to rater
|
||||
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
|
||||
SMMaxCallDuration time.Duration // The maximum duration of a call
|
||||
SMMinCallDuration time.Duration // Only authorize calls with allowed duration bigger than this
|
||||
MediatorEnabled bool // Starts Mediator service: <true|false>.
|
||||
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
|
||||
MediatorReconnects int // Number of reconnects to rater before giving up.
|
||||
DerivedChargers utils.DerivedChargers // System wide derived chargers, added to the account level ones
|
||||
CombinedDerivedChargers bool // Combine accounts specific derived_chargers with server configured
|
||||
FreeswitchServer string // freeswitch address host:port
|
||||
FreeswitchPass string // FS socket password
|
||||
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
|
||||
OsipsListenUdp string // Address where to listen for event datagrams coming from OpenSIPS
|
||||
OsipsMiAddr string // Adress where to reach OpenSIPS mi_datagram module
|
||||
OsipsEvSubscInterval time.Duration // Refresh event subscription at this interval
|
||||
OsipCDRS string // Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013>
|
||||
OsipsReconnects int // Number of attempts on connect failure.
|
||||
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
|
||||
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
|
||||
HistoryServerEnabled bool // Starts History as server: <true|false>.
|
||||
@@ -190,19 +195,24 @@ func (self *CGRConfig) setDefaults() error {
|
||||
}
|
||||
self.MediatorEnabled = false
|
||||
self.MediatorRater = "internal"
|
||||
self.MediatorRaterReconnects = 3
|
||||
self.MediatorReconnects = 3
|
||||
self.DerivedChargers = make(utils.DerivedChargers, 0)
|
||||
self.CombinedDerivedChargers = true
|
||||
self.SMEnabled = false
|
||||
self.SMSwitchType = FS
|
||||
self.SMRater = "internal"
|
||||
self.SMRaterReconnects = 3
|
||||
self.SMReconnects = 3
|
||||
self.SMDebitInterval = 10
|
||||
self.SMMaxCallDuration = time.Duration(3) * time.Hour
|
||||
self.SMMinCallDuration = time.Duration(0)
|
||||
self.FreeswitchServer = "127.0.0.1:8021"
|
||||
self.FreeswitchPass = "ClueCon"
|
||||
self.FreeswitchReconnects = 5
|
||||
self.OsipsListenUdp = "127.0.0.1:2020"
|
||||
self.OsipsMiAddr = "127.0.0.1:8020"
|
||||
self.OsipsEvSubscInterval = time.Duration(60) * time.Second
|
||||
self.OsipCDRS = "internal"
|
||||
self.OsipsReconnects = 3
|
||||
self.HistoryAgentEnabled = false
|
||||
self.HistoryServerEnabled = false
|
||||
self.HistoryServer = "internal"
|
||||
@@ -493,8 +503,8 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("mediator", "rater"); hasOpt {
|
||||
cfg.MediatorRater, _ = c.GetString("mediator", "rater")
|
||||
}
|
||||
if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt {
|
||||
cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects")
|
||||
if hasOpt = c.HasOption("mediator", "reconnects"); hasOpt {
|
||||
cfg.MediatorReconnects, _ = c.GetInt("mediator", "reconnects")
|
||||
}
|
||||
if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt {
|
||||
cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled")
|
||||
@@ -505,8 +515,8 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("session_manager", "rater"); hasOpt {
|
||||
cfg.SMRater, _ = c.GetString("session_manager", "rater")
|
||||
}
|
||||
if hasOpt = c.HasOption("session_manager", "rater_reconnects"); hasOpt {
|
||||
cfg.SMRaterReconnects, _ = c.GetInt("session_manager", "rater_reconnects")
|
||||
if hasOpt = c.HasOption("session_manager", "reconnects"); hasOpt {
|
||||
cfg.SMReconnects, _ = c.GetInt("session_manager", "reconnects")
|
||||
}
|
||||
if hasOpt = c.HasOption("session_manager", "debit_interval"); hasOpt {
|
||||
cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval")
|
||||
@@ -532,6 +542,24 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
|
||||
if hasOpt = c.HasOption("freeswitch", "reconnects"); hasOpt {
|
||||
cfg.FreeswitchReconnects, _ = c.GetInt("freeswitch", "reconnects")
|
||||
}
|
||||
if hasOpt = c.HasOption("opensips", "listen_udp"); hasOpt {
|
||||
cfg.OsipsListenUdp, _ = c.GetString("opensips", "listen_udp")
|
||||
}
|
||||
if hasOpt = c.HasOption("opensips", "mi_addr"); hasOpt {
|
||||
cfg.OsipsMiAddr, _ = c.GetString("opensips", "mi_addr")
|
||||
}
|
||||
if hasOpt = c.HasOption("opensips", "events_subscribe_interval"); hasOpt {
|
||||
evSubscIntervalStr, _ := c.GetString("opensips", "events_subscribe_interval")
|
||||
if cfg.OsipsEvSubscInterval, err = utils.ParseDurationWithSecs(evSubscIntervalStr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if hasOpt = c.HasOption("opensips", "cdrs"); hasOpt {
|
||||
cfg.OsipCDRS, _ = c.GetString("opensips", "cdrs")
|
||||
}
|
||||
if hasOpt = c.HasOption("opensips", "reconnects"); hasOpt {
|
||||
cfg.OsipsReconnects, _ = c.GetInt("opensips", "reconnects")
|
||||
}
|
||||
if cfg.DerivedChargers, err = ParseCfgDerivedCharging(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -107,17 +107,22 @@ func TestDefaults(t *testing.T) {
|
||||
}
|
||||
eCfg.MediatorEnabled = false
|
||||
eCfg.MediatorRater = "internal"
|
||||
eCfg.MediatorRaterReconnects = 3
|
||||
eCfg.MediatorReconnects = 3
|
||||
eCfg.SMEnabled = false
|
||||
eCfg.SMSwitchType = FS
|
||||
eCfg.SMRater = "internal"
|
||||
eCfg.SMRaterReconnects = 3
|
||||
eCfg.SMReconnects = 3
|
||||
eCfg.SMDebitInterval = 10
|
||||
eCfg.SMMinCallDuration = time.Duration(0)
|
||||
eCfg.SMMaxCallDuration = time.Duration(3) * time.Hour
|
||||
eCfg.FreeswitchServer = "127.0.0.1:8021"
|
||||
eCfg.FreeswitchPass = "ClueCon"
|
||||
eCfg.FreeswitchReconnects = 5
|
||||
eCfg.OsipsListenUdp = "127.0.0.1:2020"
|
||||
eCfg.OsipsMiAddr = "127.0.0.1:8020"
|
||||
eCfg.OsipsEvSubscInterval = time.Duration(60) * time.Second
|
||||
eCfg.OsipCDRS = "internal"
|
||||
eCfg.OsipsReconnects = 3
|
||||
eCfg.DerivedChargers = make(utils.DerivedChargers, 0)
|
||||
eCfg.CombinedDerivedChargers = true
|
||||
eCfg.HistoryAgentEnabled = false
|
||||
@@ -242,17 +247,22 @@ func TestConfigFromFile(t *testing.T) {
|
||||
}
|
||||
eCfg.MediatorEnabled = true
|
||||
eCfg.MediatorRater = "test"
|
||||
eCfg.MediatorRaterReconnects = 99
|
||||
eCfg.MediatorReconnects = 99
|
||||
eCfg.SMEnabled = true
|
||||
eCfg.SMSwitchType = "test"
|
||||
eCfg.SMRater = "test"
|
||||
eCfg.SMRaterReconnects = 99
|
||||
eCfg.SMReconnects = 99
|
||||
eCfg.SMDebitInterval = 99
|
||||
eCfg.SMMinCallDuration = time.Duration(98) * time.Second
|
||||
eCfg.SMMaxCallDuration = time.Duration(99) * time.Second
|
||||
eCfg.FreeswitchServer = "test"
|
||||
eCfg.FreeswitchPass = "test"
|
||||
eCfg.FreeswitchReconnects = 99
|
||||
eCfg.OsipsListenUdp = "test"
|
||||
eCfg.OsipsMiAddr = "test"
|
||||
eCfg.OsipsEvSubscInterval = time.Duration(99) * time.Second
|
||||
eCfg.OsipCDRS = "test"
|
||||
eCfg.OsipsReconnects = 99
|
||||
eCfg.DerivedChargers = utils.DerivedChargers{&utils.DerivedCharger{RunId: "test", RunFilters: "", ReqTypeField: "test", DirectionField: "test", TenantField: "test",
|
||||
CategoryField: "test", AccountField: "test", SubjectField: "test", DestinationField: "test", SetupTimeField: "test", AnswerTimeField: "test", UsageField: "test"}}
|
||||
eCfg.CombinedDerivedChargers = true
|
||||
|
||||
@@ -83,22 +83,29 @@ extra_fields = test:test # Field identifiers of the fields to add in extra field
|
||||
[mediator]
|
||||
enabled = true # Starts Mediacategory service: <true|false>.
|
||||
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
|
||||
rater_reconnects = 99 # Number of reconnects to rater before giving up.
|
||||
reconnects = 99 # Number of reconnects to rater before giving up.
|
||||
|
||||
[session_manager]
|
||||
enabled = true # Starts SessionManager service: <true|false>.
|
||||
switch_type = test # Defines the type of switch behind: <freeswitch>.
|
||||
rater = test # Address where to reach the Rater.
|
||||
rater_reconnects = 99 # Number of reconnects to rater before giving up.
|
||||
reconnects = 99 # Number of reconnects to rater before giving up.
|
||||
debit_interval = 99 # Interval to perform debits on.
|
||||
max_call_duration = 99 # Maximum call duration a prepaid call can last
|
||||
min_call_duration = 98 # Only authorize calls with allowed duration bigger than this
|
||||
min_call_duration = 98 # Only authorize calls with allowed duration bigger than this
|
||||
max_call_duration = 99 # Maximum call duration a prepaid call can last
|
||||
|
||||
[freeswitch]
|
||||
server = test # Adress where to connect to FreeSWITCH socket.
|
||||
passwd = test # FreeSWITCH socket password.
|
||||
reconnects = 99 # Number of attempts on connect failure.
|
||||
|
||||
[opensips]
|
||||
listen_udp = test # Address where to listen for event datagrams coming from OpenSIPS
|
||||
mi_addr = test # Adress where to reach OpenSIPS mi_datagram module
|
||||
events_subscribe_interval = 99 # Automatic events subscription to OpenSIPS, 0 to disable it
|
||||
cdrs = test # Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013>
|
||||
reconnects = 99 # Number of attempts on connect failure.
|
||||
|
||||
[derived_charging]
|
||||
run_ids = test # Identifiers of additional sessions control.
|
||||
run_filters = # No filters applied
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# Real-time Charging System for Telecom & ISP environments
|
||||
# Copyright (C) 2012-2014 ITsysCOM GmbH
|
||||
# Copyright (C) ITsysCOM GmbH
|
||||
#
|
||||
# This file contains the default configuration hardcoded into CGRateS.
|
||||
# This is what you get when you load CGRateS with an empty configuration file.
|
||||
@@ -90,12 +90,12 @@
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
|
||||
[session_manager]
|
||||
# enabled = false # Starts SessionManager service: <true|false>.
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
|
||||
# rater = internal # Address where to reach the Rater.
|
||||
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
|
||||
# enabled = false # Starts SessionManager service: <true|false>
|
||||
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>
|
||||
# rater = internal # Address where to reach the Rater
|
||||
# reconnects = 3 # Number of reconnects to rater/cdrs before giving up.
|
||||
# debit_interval = 10 # Interval to perform debits on.
|
||||
# min_call_duration = 0s # Only authorize calls with allowed duration bigger than this
|
||||
# min_call_duration = 0s Only authorize calls with allowed duration bigger than this
|
||||
# max_call_duration = 3h # Maximum call duration a prepaid call can last
|
||||
|
||||
|
||||
@@ -104,9 +104,16 @@
|
||||
# passwd = ClueCon # FreeSWITCH socket password.
|
||||
# reconnects = 5 # Number of attempts on connect failure.
|
||||
|
||||
[opensips]
|
||||
# listen_udp = 127.0.0.1:2020 # Address where to listen for datagram events coming from OpenSIPS
|
||||
# mi_addr = 127.0.0.1:8020 # Adress where to reach OpenSIPS mi_datagram module
|
||||
# events_subscribe_interval = 60s # Automatic events subscription to OpenSIPS, 0 to disable it
|
||||
# cdrs = internal # Address where to reach CDR Server, empty to disable CDR processing <""|internal|127.0.0.1:2013>
|
||||
# reconnects = 3 # Number of attempts on connect failure.
|
||||
|
||||
[derived_charging]
|
||||
# run_ids = # Identifiers of additional sessions control.
|
||||
# run_filters = # List of cdr field filters for each run.
|
||||
# run_filters = # List of cdr field filters for each run.
|
||||
# reqtype_fields = # Name of request type fields to be used during additional sessions control <""|*default|field_name>.
|
||||
# direction_fields = # Name of direction fields to be used during additional sessions control <""|*default|field_name>.
|
||||
# tenant_fields = # Name of tenant fields to be used during additional sessions control <""|*default|field_name>.
|
||||
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cdrs
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -24,15 +24,13 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/mediator"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
cfg *config.CGRConfig // Share the configuration with the rest of the package
|
||||
storage engine.CdrStorage
|
||||
medi *mediator.Mediator
|
||||
storage CdrStorage
|
||||
medi *Mediator
|
||||
)
|
||||
|
||||
// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
|
||||
@@ -43,7 +41,7 @@ func storeAndMediate(storedCdr *utils.StoredCdr) error {
|
||||
if cfg.CDRSMediator == utils.INTERNAL {
|
||||
go func() {
|
||||
if err := medi.RateCdr(storedCdr); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", err.Error()))
|
||||
Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", err.Error()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -54,10 +52,10 @@ func storeAndMediate(storedCdr *utils.StoredCdr) error {
|
||||
func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
cgrCdr, err := utils.NewCgrCdrFromHttpReq(r)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
}
|
||||
if err := storeAndMediate(cgrCdr.AsStoredCdr()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,28 +64,28 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
fsCdr, err := NewFSCdr(body)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
Logger.Err(fmt.Sprintf("Could not create CDR entry: %s", err.Error()))
|
||||
}
|
||||
if err := storeAndMediate(fsCdr.AsStoredCdr()); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
Logger.Err(fmt.Sprintf("Errors when storing CDR entry: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
type CDRS struct{}
|
||||
|
||||
func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS {
|
||||
func NewCdrS(s CdrStorage, m *Mediator, c *config.CGRConfig) *CDRS {
|
||||
storage = s
|
||||
medi = m
|
||||
cfg = c
|
||||
return &CDRS{}
|
||||
}
|
||||
|
||||
func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) {
|
||||
func (cdrs *CDRS) RegisterHanlersToServer(server *Server) {
|
||||
server.RegisterHttpFunc("/cgr", cgrCdrHandler)
|
||||
server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler)
|
||||
}
|
||||
|
||||
// Used to internally process CDR
|
||||
func (cdrs *CDRS) ProcessRawCdr(rawCdr utils.RawCdr) error {
|
||||
return storeAndMediate(rawCdr.AsStoredCdr())
|
||||
func (cdrs *CDRS) ProcessCdr(cdr *utils.StoredCdr) error {
|
||||
return storeAndMediate(cdr)
|
||||
}
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cdrs
|
||||
package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -111,11 +110,11 @@ func (fsCdr FSCdr) searchExtraField(field string, body map[string]interface{}) (
|
||||
return
|
||||
}
|
||||
} else {
|
||||
engine.Logger.Warning(fmt.Sprintf("Slice with no maps: %v", reflect.TypeOf(item)))
|
||||
Logger.Warning(fmt.Sprintf("Slice with no maps: %v", reflect.TypeOf(item)))
|
||||
}
|
||||
}
|
||||
default:
|
||||
engine.Logger.Warning(fmt.Sprintf("Unexpected type: %v", reflect.TypeOf(v)))
|
||||
Logger.Warning(fmt.Sprintf("Unexpected type: %v", reflect.TypeOf(v)))
|
||||
}
|
||||
}
|
||||
return
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package cdrs
|
||||
package engine
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
@@ -44,7 +44,7 @@ README:
|
||||
var ratingDbCsv, ratingDbStor, ratingDbApier RatingStorage // Each ratingDb will have it's own sources to collect data
|
||||
var accountDbCsv, accountDbStor, accountDbApier AccountingStorage // Each ratingDb will have it's own sources to collect data
|
||||
var storDb LoadStorage
|
||||
var cfg *config.CGRConfig
|
||||
var lCfg *config.CGRConfig
|
||||
|
||||
// Arguments received via test command
|
||||
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
|
||||
@@ -57,27 +57,27 @@ func TestConnDataDbs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
cfg, _ = config.NewDefaultCGRConfig()
|
||||
lCfg, _ = config.NewDefaultCGRConfig()
|
||||
var err error
|
||||
if ratingDbCsv, err = ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, "4", cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if ratingDbCsv, err = ConfigureRatingStorage(lCfg.RatingDBType, lCfg.RatingDBHost, lCfg.RatingDBPort, "4", lCfg.RatingDBUser, lCfg.RatingDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
if ratingDbStor, err = ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, "5", cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if ratingDbStor, err = ConfigureRatingStorage(lCfg.RatingDBType, lCfg.RatingDBHost, lCfg.RatingDBPort, "5", lCfg.RatingDBUser, lCfg.RatingDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
if ratingDbApier, err = ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, "6", cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if ratingDbApier, err = ConfigureRatingStorage(lCfg.RatingDBType, lCfg.RatingDBHost, lCfg.RatingDBPort, "6", lCfg.RatingDBUser, lCfg.RatingDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
if accountDbCsv, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "7",
|
||||
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if accountDbCsv, err = ConfigureAccountingStorage(lCfg.AccountDBType, lCfg.AccountDBHost, lCfg.AccountDBPort, "7",
|
||||
lCfg.AccountDBUser, lCfg.AccountDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
if accountDbStor, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "8",
|
||||
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if accountDbStor, err = ConfigureAccountingStorage(lCfg.AccountDBType, lCfg.AccountDBHost, lCfg.AccountDBPort, "8",
|
||||
lCfg.AccountDBUser, lCfg.AccountDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
if accountDbApier, err = ConfigureAccountingStorage(cfg.AccountDBType, cfg.AccountDBHost, cfg.AccountDBPort, "9",
|
||||
cfg.AccountDBUser, cfg.AccountDBPass, cfg.DBDataEncoding); err != nil {
|
||||
if accountDbApier, err = ConfigureAccountingStorage(lCfg.AccountDBType, lCfg.AccountDBHost, lCfg.AccountDBPort, "9",
|
||||
lCfg.AccountDBUser, lCfg.AccountDBPass, lCfg.DBDataEncoding); err != nil {
|
||||
t.Fatal("Error on ratingDb connection: ", err.Error())
|
||||
}
|
||||
for _, db := range []Storage{ratingDbCsv, ratingDbStor, ratingDbApier, accountDbCsv, accountDbStor, accountDbApier} {
|
||||
@@ -94,7 +94,7 @@ func TestCreateStorTpTables(t *testing.T) {
|
||||
return
|
||||
}
|
||||
var db *MySQLStorage
|
||||
if d, err := NewMySQLStorage(cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
|
||||
if d, err := NewMySQLStorage(lCfg.StorDBHost, lCfg.StorDBPort, lCfg.StorDBName, lCfg.StorDBUser, lCfg.StorDBPass); err != nil {
|
||||
t.Error("Error on opening database connection: ", err)
|
||||
return
|
||||
} else {
|
||||
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package mediator
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@@ -24,11 +24,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
|
||||
func NewMediator(connector Connector, logDb LogStorage, cdrDb CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
|
||||
m = &Mediator{
|
||||
connector: connector,
|
||||
logDb: logDb,
|
||||
@@ -39,16 +38,16 @@ func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engi
|
||||
}
|
||||
|
||||
type Mediator struct {
|
||||
connector engine.Connector
|
||||
logDb engine.LogStorage
|
||||
cdrDb engine.CdrStorage
|
||||
connector Connector
|
||||
logDb LogStorage
|
||||
cdrDb CdrStorage
|
||||
cgrCfg *config.CGRConfig
|
||||
}
|
||||
|
||||
// Retrive the cost from logging database, nil in case of no log
|
||||
func (self *Mediator) getCostsFromDB(cgrid, runId string) (cc *engine.CallCost, err error) {
|
||||
func (self *Mediator) getCostsFromDB(cgrid, runId string) (cc *CallCost, err error) {
|
||||
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
|
||||
cc, err = self.logDb.GetCallCostLog(cgrid, engine.SESSION_MANAGER_SOURCE, runId)
|
||||
cc, err = self.logDb.GetCallCostLog(cgrid, SESSION_MANAGER_SOURCE, runId)
|
||||
if cc != nil {
|
||||
break
|
||||
}
|
||||
@@ -58,13 +57,13 @@ func (self *Mediator) getCostsFromDB(cgrid, runId string) (cc *engine.CallCost,
|
||||
}
|
||||
|
||||
// Retrive the cost from engine
|
||||
func (self *Mediator) getCostFromRater(storedCdr *utils.StoredCdr) (*engine.CallCost, error) {
|
||||
cc := &engine.CallCost{}
|
||||
func (self *Mediator) getCostFromRater(storedCdr *utils.StoredCdr) (*CallCost, error) {
|
||||
cc := &CallCost{}
|
||||
var err error
|
||||
if storedCdr.Usage == time.Duration(0) { // failed call, returning empty callcost, no error
|
||||
return cc, nil
|
||||
}
|
||||
cd := engine.CallDescriptor{
|
||||
cd := CallDescriptor{
|
||||
TOR: storedCdr.TOR,
|
||||
Direction: storedCdr.Direction,
|
||||
Tenant: storedCdr.Tenant,
|
||||
@@ -82,16 +81,16 @@ func (self *Mediator) getCostFromRater(storedCdr *utils.StoredCdr) (*engine.Call
|
||||
err = self.connector.GetCost(cd, cc)
|
||||
}
|
||||
if err != nil {
|
||||
self.logDb.LogError(storedCdr.CgrId, engine.MEDIATOR_SOURCE, storedCdr.MediationRunId, err.Error())
|
||||
self.logDb.LogError(storedCdr.CgrId, MEDIATOR_SOURCE, storedCdr.MediationRunId, err.Error())
|
||||
} else {
|
||||
// If the mediator calculated a price it will write it to logdb
|
||||
self.logDb.LogCallCost(storedCdr.CgrId, engine.MEDIATOR_SOURCE, storedCdr.MediationRunId, cc)
|
||||
self.logDb.LogCallCost(storedCdr.CgrId, MEDIATOR_SOURCE, storedCdr.MediationRunId, cc)
|
||||
}
|
||||
return cc, err
|
||||
}
|
||||
|
||||
func (self *Mediator) rateCDR(storedCdr *utils.StoredCdr) error {
|
||||
var qryCC *engine.CallCost
|
||||
var qryCC *CallCost
|
||||
var errCost error
|
||||
if storedCdr.ReqType == utils.PREPAID {
|
||||
// Should be previously calculated and stored in DB
|
||||
@@ -116,7 +115,7 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
var dcs utils.DerivedChargers
|
||||
if err := self.connector.GetDerivedChargers(attrsDC, &dcs); err != nil {
|
||||
errText := fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", storedCdr.CgrId, err.Error())
|
||||
engine.Logger.Err(errText)
|
||||
Logger.Err(errText)
|
||||
return errors.New(errText)
|
||||
}
|
||||
for _, dc := range dcs {
|
||||
@@ -156,7 +155,7 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
extraInfo = err.Error()
|
||||
}
|
||||
if err := self.cdrDb.SetRatedCdr(cdr, extraInfo); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not record cost for cgrid: <%s>, ERROR: <%s>, cost: %f, extraInfo: %s",
|
||||
Logger.Err(fmt.Sprintf("<Mediator> Could not record cost for cgrid: <%s>, ERROR: <%s>, cost: %f, extraInfo: %s",
|
||||
cdr.CgrId, err.Error(), cdr.Cost, extraInfo))
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package mediator
|
||||
package engine
|
||||
|
||||
import (
|
||||
"flag"
|
||||
@@ -31,7 +31,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -49,13 +48,11 @@ README:
|
||||
* Execute remote Apis and test their replies(follow prepaid1cent scenario so we can test load in dataDb also).
|
||||
*/
|
||||
|
||||
var cfg *config.CGRConfig
|
||||
var cgrCfg *config.CGRConfig
|
||||
var cgrRpc *rpc.Client
|
||||
var cdrStor engine.CdrStorage
|
||||
var cdrStor CdrStorage
|
||||
var httpClient *http.Client
|
||||
|
||||
var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args
|
||||
var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
|
||||
var storDbType = flag.String("stordb_type", utils.MYSQL, "The type of the storDb database <mysql>")
|
||||
var startDelay = flag.Int("delay_start", 300, "Number of miliseconds to it for rater to start and cache")
|
||||
var cfgPath = path.Join(*dataDir, "conf", "samples", "mediator_test1.cfg")
|
||||
@@ -65,11 +62,11 @@ func TestInitRatingDb(t *testing.T) {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
cfg, err = config.NewCGRConfigFromFile(&cfgPath)
|
||||
cgrCfg, err = config.NewCGRConfigFromFile(&cfgPath)
|
||||
if err != nil {
|
||||
t.Fatal("Got config error: ", err.Error())
|
||||
}
|
||||
ratingDb, err := engine.ConfigureRatingStorage(cfg.RatingDBType, cfg.RatingDBHost, cfg.RatingDBPort, cfg.RatingDBName, cfg.RatingDBUser, cfg.RatingDBPass, cfg.DBDataEncoding)
|
||||
ratingDb, err := ConfigureRatingStorage(cgrCfg.RatingDBType, cgrCfg.RatingDBHost, cgrCfg.RatingDBPort, cgrCfg.RatingDBName, cgrCfg.RatingDBUser, cgrCfg.RatingDBPass, cgrCfg.DBDataEncoding)
|
||||
if err != nil {
|
||||
t.Fatal("Cannot connect to dataDb", err)
|
||||
}
|
||||
@@ -86,14 +83,14 @@ func TestInitStorDb(t *testing.T) {
|
||||
if *storDbType != utils.MYSQL {
|
||||
t.Fatal("Unsupported storDbType")
|
||||
}
|
||||
var mysql *engine.MySQLStorage
|
||||
var mysql *MySQLStorage
|
||||
var err error
|
||||
if cdrStor, err = engine.ConfigureCdrStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass); err != nil {
|
||||
if cdrStor, err = ConfigureCdrStorage(cgrCfg.StorDBType, cgrCfg.StorDBHost, cgrCfg.StorDBPort, cgrCfg.StorDBName, cgrCfg.StorDBUser, cgrCfg.StorDBPass); err != nil {
|
||||
t.Fatal("Error on opening database connection: ", err)
|
||||
} else {
|
||||
mysql = cdrStor.(*engine.MySQLStorage)
|
||||
mysql = cdrStor.(*MySQLStorage)
|
||||
}
|
||||
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, engine.CREATE_CDRS_TABLES_SQL)); err != nil {
|
||||
if err := mysql.CreateTablesFromScript(path.Join(*dataDir, "storage", *storDbType, CREATE_CDRS_TABLES_SQL)); err != nil {
|
||||
t.Fatal("Error on mysql creation: ", err.Error())
|
||||
return // No point in going further
|
||||
}
|
||||
@@ -129,7 +126,7 @@ func TestRpcConn(t *testing.T) {
|
||||
}
|
||||
var err error
|
||||
//cgrRpc, err = rpc.Dial("tcp", cfg.RPCGOBListen) //ToDo: Fix with automatic config
|
||||
cgrRpc, err = jsonrpc.Dial("tcp", cfg.RPCJSONListen)
|
||||
cgrRpc, err = jsonrpc.Dial("tcp", cgrCfg.RPCJSONListen)
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to CGR GOB-RPC Server: ", err.Error())
|
||||
}
|
||||
@@ -151,8 +148,8 @@ func TestPostCdrs(t *testing.T) {
|
||||
utils.ACCOUNT: []string{"1010"}, utils.SUBJECT: []string{"1010"}, utils.ANSWER_TIME: []string{"2013-11-07T08:42:26Z"},
|
||||
utils.USAGE: []string{"10"}, "field_extr1": []string{"val_extr1"}, "fieldextr2": []string{"valextr2"}}
|
||||
for _, cdrForm := range []url.Values{cdrForm1, cdrForm2, cdrFormData1} {
|
||||
cdrForm.Set(utils.CDRSOURCE, engine.TEST_SQL)
|
||||
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cfg.HTTPListen), cdrForm); err != nil {
|
||||
cdrForm.Set(utils.CDRSOURCE, TEST_SQL)
|
||||
if _, err := httpClient.PostForm(fmt.Sprintf("http://%s/cgr", cgrCfg.HTTPListen), cdrForm); err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
}
|
||||
@@ -174,10 +171,10 @@ func TestInjectCdrs(t *testing.T) {
|
||||
if !*testLocal {
|
||||
return
|
||||
}
|
||||
cgrCdr1 := utils.CgrCdr{utils.TOR: utils.VOICE, utils.ACCID: "aaaaadsafdsaf", "cdrsource": engine.TEST_SQL, utils.CDRHOST: "192.168.1.1", utils.REQTYPE: "rated", utils.DIRECTION: "*out",
|
||||
cgrCdr1 := utils.CgrCdr{utils.TOR: utils.VOICE, utils.ACCID: "aaaaadsafdsaf", "cdrsource": TEST_SQL, utils.CDRHOST: "192.168.1.1", utils.REQTYPE: "rated", utils.DIRECTION: "*out",
|
||||
utils.TENANT: "cgrates.org", utils.CATEGORY: "call", utils.ACCOUNT: "dan", utils.SUBJECT: "dan", utils.DESTINATION: "+4986517174963",
|
||||
utils.ANSWER_TIME: "2013-11-07T08:42:26Z", utils.USAGE: "10"}
|
||||
cgrCdr2 := utils.CgrCdr{utils.TOR: utils.VOICE, utils.ACCID: "baaaadsafdsaf", "cdrsource": engine.TEST_SQL, utils.CDRHOST: "192.168.1.1", utils.REQTYPE: "rated", utils.DIRECTION: "*out",
|
||||
cgrCdr2 := utils.CgrCdr{utils.TOR: utils.VOICE, utils.ACCID: "baaaadsafdsaf", "cdrsource": TEST_SQL, utils.CDRHOST: "192.168.1.1", utils.REQTYPE: "rated", utils.DIRECTION: "*out",
|
||||
utils.TENANT: "cgrates.org", utils.CATEGORY: "call", utils.ACCOUNT: "dan", utils.SUBJECT: "dan", utils.DESTINATION: "+4986517173964",
|
||||
utils.ANSWER_TIME: "2013-11-07T09:42:26Z", utils.USAGE: "20"}
|
||||
for _, cdr := range []utils.CgrCdr{cgrCdr1, cgrCdr2} {
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
type Responder struct {
|
||||
Bal *balancer2go.Balancer
|
||||
ExitChan chan bool
|
||||
CdrSrv *CDRS
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -128,6 +129,17 @@ func (rs *Responder) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *ut
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *Responder) ProcessCdr(cdr *utils.StoredCdr, reply *string) error {
|
||||
if rs.CdrSrv == nil {
|
||||
return errors.New("CdrServerNotRunning")
|
||||
}
|
||||
if err := rs.CdrSrv.ProcessCdr(cdr); err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *Responder) FlushCache(arg CallDescriptor, reply *float64) (err error) {
|
||||
if rs.Bal != nil {
|
||||
*reply, err = rs.callMethod(&arg, "Responder.FlushCache")
|
||||
@@ -287,6 +299,7 @@ type Connector interface {
|
||||
RefundIncrements(CallDescriptor, *float64) error
|
||||
GetMaxSessionTime(CallDescriptor, *float64) error
|
||||
GetDerivedChargers(utils.AttrDerivedChargers, *utils.DerivedChargers) error
|
||||
ProcessCdr(*utils.StoredCdr, *string) error
|
||||
}
|
||||
|
||||
type RPCClientConnector struct {
|
||||
@@ -316,3 +329,7 @@ func (rcc *RPCClientConnector) GetMaxSessionTime(cd CallDescriptor, resp *float6
|
||||
func (rcc *RPCClientConnector) GetDerivedChargers(attrs utils.AttrDerivedChargers, dcs *utils.DerivedChargers) error {
|
||||
return rcc.Client.Call("ApierV1.GetDerivedChargers", attrs, dcs)
|
||||
}
|
||||
|
||||
func (rcc *RPCClientConnector) ProcessCdr(cdr *utils.StoredCdr, reply *string) error {
|
||||
return rcc.Client.Call("CDRSV1.ProcessCdr", cdr, reply)
|
||||
}
|
||||
|
||||
@@ -21,8 +21,8 @@ package general_tests
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/cdrs"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
)
|
||||
|
||||
@@ -215,12 +215,12 @@ var jsonCdr = []byte(`{"core-uuid":"feef0b51-7fdf-4c4a-878e-aff233752de2","chann
|
||||
|
||||
func TestEvCorelate(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
cdrs.New(nil, nil, cfg) // So we can set the package cfg
|
||||
engine.NewCdrS(nil, nil, cfg) // So we can set the package cfg
|
||||
answerEv := new(sessionmanager.FSEvent).New(answerEvent)
|
||||
if answerEv.GetName() != "CHANNEL_ANSWER" {
|
||||
t.Error("Event not parsed correctly: ", answerEv)
|
||||
}
|
||||
cdrEv, err := cdrs.NewFSCdr(jsonCdr)
|
||||
cdrEv, err := engine.NewFSCdr(jsonCdr)
|
||||
if err != nil {
|
||||
t.Errorf("Error loading cdr: %v", err.Error())
|
||||
} else if cdrEv.AsStoredCdr().AccId != "86cfd6e2-dbda-45a3-b59d-f683ec368e8b" {
|
||||
|
||||
@@ -8,8 +8,6 @@ go test github.com/cgrates/cgrates/engine -local
|
||||
en=$?
|
||||
go test github.com/cgrates/cgrates/cdrc -local
|
||||
cdrc=$?
|
||||
go test github.com/cgrates/cgrates/mediator -local
|
||||
med=$?
|
||||
go test github.com/cgrates/cgrates/config -local
|
||||
cfg=$?
|
||||
go test github.com/cgrates/cgrates/utils -local
|
||||
@@ -20,5 +18,5 @@ utl=$?
|
||||
|
||||
|
||||
|
||||
exit $gen && $ap && $en && $cdrc && $med && $cfg && $utl
|
||||
exit $gen && $ap && $en && $cdrc && $cfg && $utl
|
||||
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
/*
|
||||
Rating system designed to be used in VoIP Carriers World
|
||||
Copyright (C) 2013 ITsysCOM
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package mediator
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FScsvCDR struct {
|
||||
rowData []string // The original row extracted form csv file
|
||||
accIdIdx,
|
||||
subjectIdx,
|
||||
reqtypeIdx,
|
||||
directionIdx,
|
||||
tenantIdx,
|
||||
torIdx,
|
||||
accountIdx,
|
||||
destinationIdx,
|
||||
setupTimeIdx,
|
||||
answerTimeIdx,
|
||||
durationIdx int // Field indexes
|
||||
cgrCfg *config.CGRConfig // CGR Config instance
|
||||
}
|
||||
|
||||
func NewFScsvCDR(cdrRow []string, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx, torIdx,
|
||||
accountIdx, destinationIdx, setupTimeIdx, answerTimeIdx, durationIdx int, cfg *config.CGRConfig) (*FScsvCDR, error) {
|
||||
fscdr := FScsvCDR{cdrRow, accIdIdx, subjectIdx, reqtypeIdx, directionIdx, tenantIdx,
|
||||
torIdx, accountIdx, destinationIdx, setupTimeIdx, answerTimeIdx, durationIdx, cfg}
|
||||
return &fscdr, nil
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetCgrId() string {
|
||||
return utils.Sha1(self.rowData[self.accIdIdx], self.rowData[self.setupTimeIdx])
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetAccId() string {
|
||||
return self.rowData[self.accIdIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetCdrHost() string {
|
||||
return utils.LOCALHOST // ToDo: Maybe extract dynamically the external IP address here
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetDirection() string {
|
||||
return "*out"
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetSubject() string {
|
||||
return self.rowData[self.subjectIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetAccount() string {
|
||||
return self.rowData[self.accountIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetDestination() string {
|
||||
return self.rowData[self.destinationIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetTOR() string {
|
||||
return self.rowData[self.torIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetTenant() string {
|
||||
return self.rowData[self.tenantIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetReqType() string {
|
||||
if self.reqtypeIdx == -1 {
|
||||
return self.cgrCfg.DefaultReqType
|
||||
}
|
||||
return self.rowData[self.reqtypeIdx]
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetAnswerTime() (time.Time, error) {
|
||||
return time.Parse("2006-01-02 15:04:05", self.rowData[self.answerTimeIdx])
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetDuration() int64 {
|
||||
dur, _ := strconv.ParseInt(self.rowData[self.durationIdx], 0, 64)
|
||||
return dur
|
||||
}
|
||||
|
||||
func (self *FScsvCDR) GetExtraFields() map[string]string {
|
||||
return nil
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) 2012-2014 ITsysCOM GmbH
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@@ -45,14 +45,14 @@ type FSSessionManager struct {
|
||||
loggerDB engine.LogStorage
|
||||
}
|
||||
|
||||
func NewFSSessionManager(storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager {
|
||||
func NewFSSessionManager(cgrCfg *config.CGRConfig, storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager {
|
||||
cfg = cgrCfg // make config global
|
||||
return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod}
|
||||
}
|
||||
|
||||
// Connects to the freeswitch mod_event_socket server and starts
|
||||
// listening for events.
|
||||
func (sm *FSSessionManager) Connect(cgrCfg *config.CGRConfig) (err error) {
|
||||
cfg = cgrCfg // make config global
|
||||
func (sm *FSSessionManager) Connect() (err error) {
|
||||
eventFilters := map[string]string{"Call-Direction": "inbound"}
|
||||
if fsock.FS, err = fsock.NewFSock(cfg.FreeswitchServer, cfg.FreeswitchPass, cfg.FreeswitchReconnects, sm.createHandlers(), eventFilters, engine.Logger.(*syslog.Writer)); err != nil {
|
||||
return err
|
||||
|
||||
217
sessionmanager/osipsevent.go
Normal file
217
sessionmanager/osipsevent.go
Normal file
@@ -0,0 +1,217 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/osipsdagram"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
/*&{Name:E_ACC_CDR AttrValues:map[to_tag:5ec6e925 cgr_account:dan setuptime:1 created:1406312794 method:INVITE callid:Y2I5ZDYzMDkzM2YzYjhlZjA2Y2ZhZTJmZTc4MGU4NDI.
|
||||
// sip_reason:OK time:1406312795 cgr_reqtype:prepaid cgr_destination:dan cgr_subject:dan sip_code:200 duration:7 from_tag:a5716471] Values:[]}*/
|
||||
|
||||
const (
|
||||
FROM_TAG = "from_tag"
|
||||
TO_TAG = "to_tag"
|
||||
CALLID = "callid"
|
||||
CGR_CATEGORY = "cgr_category"
|
||||
CGR_REQTYPE = "cgr_reqtype"
|
||||
CGR_TENANT = "cgr_tenant"
|
||||
CGR_SUBJECT = "cgr_subject"
|
||||
CGR_ACCOUNT = "cgr_account"
|
||||
CGR_DESTINATION = "cgr_destination"
|
||||
TIME = "time"
|
||||
SETUP_DURATION = "setuptime"
|
||||
OSIPS__SETUP_TIME = "created"
|
||||
OSIPS_DURATION = "duration"
|
||||
)
|
||||
|
||||
func NewOsipsEvent(osipsDagramEvent *osipsdagram.OsipsEvent) (*OsipsEvent, error) {
|
||||
return &OsipsEvent{osipsEvent: osipsDagramEvent}, nil
|
||||
}
|
||||
|
||||
type OsipsEvent struct {
|
||||
osipsEvent *osipsdagram.OsipsEvent
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) New(evStr string) Event {
|
||||
return osipsev
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetName() string {
|
||||
return osipsev.osipsEvent.Name
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetCgrId() string {
|
||||
setupTime, _ := osipsev.GetSetupTime(utils.META_DEFAULT)
|
||||
return utils.Sha1(osipsev.GetUUID(), setupTime.UTC().String())
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetUUID() string {
|
||||
return osipsev.osipsEvent.AttrValues[CALLID] + ";" + osipsev.osipsEvent.AttrValues[FROM_TAG] + ";" + osipsev.osipsEvent.AttrValues[TO_TAG]
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetDirection(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.OUT
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetSubject(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_SUBJECT])
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetAccount(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_ACCOUNT])
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetDestination(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_DESTINATION])
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetCallDestNr(fieldName string) string {
|
||||
return osipsev.GetDestination(fieldName)
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetCategory(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_CATEGORY], config.CgrConfig().DefaultCategory)
|
||||
}
|
||||
|
||||
func (osipsev *OsipsEvent) GetTenant(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_TENANT], config.CgrConfig().DefaultTenant)
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetReqType(fieldName string) string {
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
return fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[CGR_REQTYPE], config.CgrConfig().DefaultReqType)
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetSetupTime(fieldName string) (time.Time, error) {
|
||||
sTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS__SETUP_TIME])
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
sTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
} else if fieldName == utils.META_DEFAULT {
|
||||
sTimeStr = osipsev.osipsEvent.AttrValues[OSIPS__SETUP_TIME]
|
||||
}
|
||||
return utils.ParseTimeDetectLayout(sTimeStr)
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetAnswerTime(fieldName string) (time.Time, error) {
|
||||
aTimeStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[TIME])
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
aTimeStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
} else if fieldName == utils.META_DEFAULT {
|
||||
aTimeStr = osipsev.osipsEvent.AttrValues[TIME]
|
||||
}
|
||||
return utils.ParseTimeDetectLayout(aTimeStr)
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetEndTime() (time.Time, error) {
|
||||
var nilTime time.Time
|
||||
aTime, err := osipsev.GetAnswerTime(utils.META_DEFAULT)
|
||||
if err != nil {
|
||||
return nilTime, err
|
||||
}
|
||||
dur, err := osipsev.GetDuration(utils.META_DEFAULT)
|
||||
if err != nil {
|
||||
return nilTime, err
|
||||
}
|
||||
return aTime.Add(dur), nil
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetDuration(fieldName string) (time.Duration, error) {
|
||||
durStr := utils.FirstNonEmpty(osipsev.osipsEvent.AttrValues[fieldName], osipsev.osipsEvent.AttrValues[OSIPS_DURATION])
|
||||
if strings.HasPrefix(fieldName, utils.STATIC_VALUE_PREFIX) { // Static value
|
||||
durStr = fieldName[len(utils.STATIC_VALUE_PREFIX):]
|
||||
}
|
||||
return utils.ParseDurationWithSecs(durStr)
|
||||
}
|
||||
func (osipsev *OsipsEvent) MissingParameter() bool {
|
||||
var nilTime time.Time
|
||||
var nilDur time.Duration
|
||||
aTime, _ := osipsev.GetAnswerTime(utils.META_DEFAULT)
|
||||
dur, _ := osipsev.GetDuration(utils.META_DEFAULT)
|
||||
return len(osipsev.GetUUID()) == 0 ||
|
||||
len(osipsev.GetAccount(utils.META_DEFAULT)) == 0 ||
|
||||
len(osipsev.GetSubject(utils.META_DEFAULT)) == 0 ||
|
||||
len(osipsev.GetDestination(utils.META_DEFAULT)) == 0 ||
|
||||
aTime == nilTime ||
|
||||
dur == nilDur
|
||||
}
|
||||
func (osipsev *OsipsEvent) ParseEventValue(*utils.RSRField) string {
|
||||
return ""
|
||||
}
|
||||
func (osipsev *OsipsEvent) PassesFieldFilter(*utils.RSRField) (bool, string) {
|
||||
return false, ""
|
||||
}
|
||||
func (osipsev *OsipsEvent) GetExtraFields() map[string]string {
|
||||
primaryFields := []string{"to_tag", "setuptime", "created", "method", "callid", "sip_reason", "time", "sip_code", "duration", "from_tag",
|
||||
"cgr_tenant", "cgr_category", "cgr_reqtype", "cgr_account", "cgr_subject", "cgr_destination"}
|
||||
extraFields := make(map[string]string)
|
||||
for field, val := range osipsev.osipsEvent.AttrValues {
|
||||
if !utils.IsSliceMember(primaryFields, field) {
|
||||
extraFields[field] = val
|
||||
}
|
||||
}
|
||||
return extraFields
|
||||
}
|
||||
func (osipsEv *OsipsEvent) GetOriginatorIP() string {
|
||||
if osipsEv.osipsEvent == nil || osipsEv.osipsEvent.OriginatorAddress == nil {
|
||||
return ""
|
||||
}
|
||||
return osipsEv.osipsEvent.OriginatorAddress.IP.String()
|
||||
}
|
||||
func (osipsEv *OsipsEvent) AsStoredCdr() *utils.StoredCdr {
|
||||
storCdr := new(utils.StoredCdr)
|
||||
storCdr.CgrId = osipsEv.GetCgrId()
|
||||
storCdr.TOR = utils.VOICE
|
||||
storCdr.AccId = osipsEv.GetUUID()
|
||||
storCdr.CdrHost = osipsEv.GetOriginatorIP()
|
||||
storCdr.CdrSource = "OSIPS_" + osipsEv.GetName()
|
||||
storCdr.ReqType = osipsEv.GetReqType(utils.META_DEFAULT)
|
||||
storCdr.Direction = osipsEv.GetDirection(utils.META_DEFAULT)
|
||||
storCdr.Tenant = osipsEv.GetTenant(utils.META_DEFAULT)
|
||||
storCdr.Category = osipsEv.GetCategory(utils.META_DEFAULT)
|
||||
storCdr.Account = osipsEv.GetAccount(utils.META_DEFAULT)
|
||||
storCdr.Subject = osipsEv.GetSubject(utils.META_DEFAULT)
|
||||
storCdr.Destination = osipsEv.GetDestination(utils.META_DEFAULT)
|
||||
storCdr.SetupTime, _ = osipsEv.GetSetupTime(utils.META_DEFAULT)
|
||||
storCdr.AnswerTime, _ = osipsEv.GetAnswerTime(utils.META_DEFAULT)
|
||||
storCdr.Usage, _ = osipsEv.GetDuration(utils.META_DEFAULT)
|
||||
storCdr.ExtraFields = osipsEv.GetExtraFields()
|
||||
storCdr.Cost = -1
|
||||
return storCdr
|
||||
}
|
||||
135
sessionmanager/osipsevent_test.go
Normal file
135
sessionmanager/osipsevent_test.go
Normal file
@@ -0,0 +1,135 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"net"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/osipsdagram"
|
||||
)
|
||||
|
||||
var addr, _ = net.ResolveUDPAddr("udp", "172.16.254.77:42574")
|
||||
var osipsEv = &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_CDR",
|
||||
AttrValues: map[string]string{"to_tag": "4ea9687f", "cgr_account": "dan", "setuptime": "7", "created": "1406370492", "method": "INVITE", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ",
|
||||
"sip_reason": "OK", "time": "1406370499", "cgr_reqtype": "prepaid", "cgr_subject": "dan", "cgr_destination": "+4986517174963", "cgr_tenant": "itsyscom.com", "sip_code": "200",
|
||||
"duration": "20", "from_tag": "eb082607", "extra1": "val1", "extra2": "val2"}, OriginatorAddress: addr}}
|
||||
|
||||
func TestOsipsEventInterface(t *testing.T) {
|
||||
var _ Event = Event(osipsEv)
|
||||
}
|
||||
|
||||
func TestOsipsEventParseStatic(t *testing.T) {
|
||||
setupTime, _ := osipsEv.GetSetupTime("^2013-12-07 08:42:24")
|
||||
answerTime, _ := osipsEv.GetAnswerTime("^2013-12-07 08:42:24")
|
||||
dur, _ := osipsEv.GetDuration("^60s")
|
||||
if osipsEv.GetReqType("^test") != "test" ||
|
||||
osipsEv.GetDirection("^test") != "test" ||
|
||||
osipsEv.GetTenant("^test") != "test" ||
|
||||
osipsEv.GetCategory("^test") != "test" ||
|
||||
osipsEv.GetAccount("^test") != "test" ||
|
||||
osipsEv.GetSubject("^test") != "test" ||
|
||||
osipsEv.GetDestination("^test") != "test" ||
|
||||
setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) ||
|
||||
answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC) ||
|
||||
dur != time.Duration(60)*time.Second {
|
||||
t.Error("Values out of static not matching",
|
||||
osipsEv.GetReqType("^test") != "test",
|
||||
osipsEv.GetDirection("^test") != "test",
|
||||
osipsEv.GetTenant("^test") != "test",
|
||||
osipsEv.GetCategory("^test") != "test",
|
||||
osipsEv.GetAccount("^test") != "test",
|
||||
osipsEv.GetSubject("^test") != "test",
|
||||
osipsEv.GetDestination("^test") != "test",
|
||||
setupTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC),
|
||||
answerTime != time.Date(2013, 12, 7, 8, 42, 24, 0, time.UTC),
|
||||
dur != time.Duration(60)*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOsipsEventGetValues(t *testing.T) {
|
||||
cfg, _ = config.NewDefaultCGRConfig()
|
||||
config.SetCgrConfig(cfg)
|
||||
setupTime, _ := osipsEv.GetSetupTime(utils.META_DEFAULT)
|
||||
answerTime, _ := osipsEv.GetAnswerTime(utils.META_DEFAULT)
|
||||
endTime, _ := osipsEv.GetEndTime()
|
||||
dur, _ := osipsEv.GetDuration(utils.META_DEFAULT)
|
||||
if osipsEv.GetName() != "E_ACC_CDR" ||
|
||||
osipsEv.GetCgrId() != utils.Sha1("ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ"+";"+"eb082607"+";"+"4ea9687f", setupTime.UTC().String()) ||
|
||||
osipsEv.GetUUID() != "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ;eb082607;4ea9687f" ||
|
||||
osipsEv.GetDirection(utils.META_DEFAULT) != utils.OUT ||
|
||||
osipsEv.GetSubject(utils.META_DEFAULT) != "dan" ||
|
||||
osipsEv.GetAccount(utils.META_DEFAULT) != "dan" ||
|
||||
osipsEv.GetDestination(utils.META_DEFAULT) != "+4986517174963" ||
|
||||
osipsEv.GetCallDestNr(utils.META_DEFAULT) != "+4986517174963" ||
|
||||
osipsEv.GetCategory(utils.META_DEFAULT) != cfg.DefaultCategory ||
|
||||
osipsEv.GetTenant(utils.META_DEFAULT) != "itsyscom.com" ||
|
||||
osipsEv.GetReqType(utils.META_DEFAULT) != "prepaid" ||
|
||||
setupTime != time.Date(2014, 7, 26, 12, 28, 12, 0, time.Local) ||
|
||||
answerTime != time.Date(2014, 7, 26, 12, 28, 19, 0, time.Local) ||
|
||||
endTime != time.Date(2014, 7, 26, 12, 28, 39, 0, time.Local) ||
|
||||
dur != time.Duration(20*time.Second) ||
|
||||
osipsEv.GetOriginatorIP() != "172.16.254.77" {
|
||||
t.Error("GetValues not matching: ", osipsEv.GetName() != "E_ACC_CDR",
|
||||
osipsEv.GetCgrId() != utils.Sha1("ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ"+";"+"eb082607"+";"+"4ea9687f", setupTime.UTC().String()),
|
||||
osipsEv.GetUUID() != "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ;eb082607;4ea9687f",
|
||||
osipsEv.GetDirection(utils.META_DEFAULT) != utils.OUT,
|
||||
osipsEv.GetSubject(utils.META_DEFAULT) != "dan",
|
||||
osipsEv.GetAccount(utils.META_DEFAULT) != "dan",
|
||||
osipsEv.GetDestination(utils.META_DEFAULT) != "+4986517174963",
|
||||
osipsEv.GetCallDestNr(utils.META_DEFAULT) != "+4986517174963",
|
||||
osipsEv.GetCategory(utils.META_DEFAULT) != cfg.DefaultCategory,
|
||||
osipsEv.GetTenant(utils.META_DEFAULT) != "itsyscom.com",
|
||||
osipsEv.GetReqType(utils.META_DEFAULT) != "prepaid",
|
||||
setupTime != time.Date(2014, 7, 26, 12, 28, 12, 0, time.Local),
|
||||
answerTime != time.Date(2014, 7, 26, 12, 28, 19, 0, time.Local),
|
||||
endTime != time.Date(2014, 7, 26, 12, 28, 39, 0, time.Local),
|
||||
dur != time.Duration(20*time.Second),
|
||||
osipsEv.GetOriginatorIP() != "172.16.254.77",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOsipsEventMissingParameter(t *testing.T) {
|
||||
if osipsEv.MissingParameter() {
|
||||
t.Errorf("Wrongly detected missing parameter: %+v", osipsEv)
|
||||
}
|
||||
osipsEv2 := &OsipsEvent{osipsEvent: &osipsdagram.OsipsEvent{Name: "E_ACC_CDR",
|
||||
AttrValues: map[string]string{"to_tag": "4ea9687f", "cgr_account": "dan", "setuptime": "7", "created": "1406370492", "method": "INVITE", "callid": "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ",
|
||||
"sip_reason": "OK", "time": "1406370499", "cgr_reqtype": "prepaid", "cgr_subject": "dan", "cgr_tenant": "itsyscom.com", "sip_code": "200",
|
||||
"duration": "20", "from_tag": "eb082607"}}}
|
||||
if !osipsEv2.MissingParameter() {
|
||||
t.Error("Failed to detect missing parameter.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOsipsEventAsStoredCdr(t *testing.T) {
|
||||
eStoredCdr := &utils.StoredCdr{CgrId: utils.Sha1("ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ;eb082607;4ea9687f", time.Date(2014, 7, 26, 12, 28, 12, 0, time.Local).UTC().String()),
|
||||
TOR: utils.VOICE, AccId: "ODVkMDI2Mzc2MDY5N2EzODhjNTAzNTdlODhiZjRlYWQ;eb082607;4ea9687f", CdrHost: "172.16.254.77", CdrSource: "OSIPS_E_ACC_CDR", ReqType: "prepaid",
|
||||
Direction: utils.OUT, Tenant: "itsyscom.com", Category: "call", Account: "dan", Subject: "dan",
|
||||
Destination: "+4986517174963", SetupTime: time.Date(2014, 7, 26, 12, 28, 12, 0, time.Local), AnswerTime: time.Date(2014, 7, 26, 12, 28, 19, 0, time.Local),
|
||||
Usage: time.Duration(20) * time.Second, ExtraFields: map[string]string{"extra1": "val1", "extra2": "val2"}, Cost: -1}
|
||||
if storedCdr := osipsEv.AsStoredCdr(); !reflect.DeepEqual(eStoredCdr, storedCdr) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eStoredCdr, storedCdr)
|
||||
}
|
||||
}
|
||||
146
sessionmanager/osipssm.go
Normal file
146
sessionmanager/osipssm.go
Normal file
@@ -0,0 +1,146 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package sessionmanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/osipsdagram"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewOSipsSessionManager(cfg *config.CGRConfig, cdrsrv engine.Connector) (*OsipsSessionManager, error) {
|
||||
osm := &OsipsSessionManager{cgrCfg: cfg, cdrsrv: cdrsrv}
|
||||
osm.eventHandlers = map[string][]func(*osipsdagram.OsipsEvent){
|
||||
"E_OPENSIPS_START": []func(*osipsdagram.OsipsEvent){osm.OnOpensipsStart},
|
||||
"E_ACC_CDR": []func(*osipsdagram.OsipsEvent){osm.OnCdr},
|
||||
}
|
||||
return osm, nil
|
||||
}
|
||||
|
||||
type OsipsSessionManager struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
cdrsrv engine.Connector
|
||||
eventHandlers map[string][]func(*osipsdagram.OsipsEvent)
|
||||
evSubscribeStop *chan struct{} // Reference towards the channel controlling subscriptions, keep it as reference so we do not need to copy it
|
||||
stopServing chan struct{} // Stop serving datagrams
|
||||
miConn *osipsdagram.OsipsMiDatagramConnector
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) Connect() (err error) {
|
||||
osm.stopServing = make(chan struct{})
|
||||
if osm.miConn, err = osipsdagram.NewOsipsMiDatagramConnector(osm.cgrCfg.OsipsMiAddr, osm.cgrCfg.OsipsReconnects); err != nil {
|
||||
return fmt.Errorf("Cannot connect to OpenSIPS at %s, error: %s", osm.cgrCfg.OsipsMiAddr, err.Error())
|
||||
}
|
||||
evSubscribeStop := make(chan struct{})
|
||||
osm.evSubscribeStop = &evSubscribeStop
|
||||
defer close(*osm.evSubscribeStop) // Stop subscribing on disconnect
|
||||
go osm.SubscribeEvents(evSubscribeStop)
|
||||
evsrv, err := osipsdagram.NewEventServer(osm.cgrCfg.OsipsListenUdp, osm.eventHandlers)
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Cannot initialize datagram server, error: <%s>", err.Error()))
|
||||
return
|
||||
}
|
||||
engine.Logger.Info(fmt.Sprintf("<SM-OpenSIPS> Listening for datagram events at <%s>", osm.cgrCfg.OsipsListenUdp))
|
||||
evsrv.ServeEvents(osm.stopServing) // Will break through stopServing on error in other places
|
||||
return errors.New("<SM-OpenSIPS> Stopped reading events")
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) DisconnectSession(uuid string, notify string) {
|
||||
return
|
||||
}
|
||||
func (osm *OsipsSessionManager) RemoveSession(uuid string) {
|
||||
return
|
||||
}
|
||||
func (osm *OsipsSessionManager) MaxDebit(cd *engine.CallDescriptor, cc *engine.CallCost) error {
|
||||
return nil
|
||||
}
|
||||
func (osm *OsipsSessionManager) GetDebitPeriod() time.Duration {
|
||||
var nilDuration time.Duration
|
||||
return nilDuration
|
||||
}
|
||||
func (osm *OsipsSessionManager) GetDbLogger() engine.LogStorage {
|
||||
return nil
|
||||
}
|
||||
func (osm *OsipsSessionManager) Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Event Handlers
|
||||
|
||||
// Automatic subscribe to OpenSIPS for events, trigered on Connect or OpenSIPS restart
|
||||
func (osm *OsipsSessionManager) SubscribeEvents(evStop chan struct{}) error {
|
||||
for {
|
||||
select {
|
||||
case <-evStop: // Break this loop from outside
|
||||
return nil
|
||||
default:
|
||||
subscribeInterval := osm.cgrCfg.OsipsEvSubscInterval + time.Duration(1)*time.Second // Avoid concurrency on expiry
|
||||
listenAddrSplt := strings.Split(osm.cgrCfg.OsipsListenUdp, ":")
|
||||
portListen := listenAddrSplt[1]
|
||||
addrListen := listenAddrSplt[0]
|
||||
if len(addrListen) == 0 { //Listen on all addresses, try finding out from mi connection
|
||||
if localAddr := osm.miConn.LocallAddr(); localAddr != nil {
|
||||
addrListen = strings.Split(localAddr.String(), ":")[1]
|
||||
}
|
||||
}
|
||||
for eventName := range osm.eventHandlers {
|
||||
if eventName == "E_OPENSIPS_START" { // Do not subscribe for start since this should be hardcoded
|
||||
continue
|
||||
}
|
||||
cmd := fmt.Sprintf(":event_subscribe:\n%s\nudp:%s:%s\n%d\n", eventName, addrListen, portListen, int(subscribeInterval.Seconds()))
|
||||
success := false
|
||||
for attempts := 0; attempts < osm.cgrCfg.OsipsReconnects; attempts++ {
|
||||
if reply, err := osm.miConn.SendCommand([]byte(cmd)); err == nil && bytes.HasPrefix(reply, []byte("200 OK")) {
|
||||
success = true
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration((attempts+1)/2) * time.Second) // Allow OpenSIPS to recover from errors
|
||||
continue // Try again
|
||||
}
|
||||
if !success {
|
||||
close(osm.stopServing) // Do not serve anymore since we got errors on subscribing
|
||||
return errors.New("Failed subscribing to OpenSIPS events")
|
||||
}
|
||||
}
|
||||
time.Sleep(osm.cgrCfg.OsipsEvSubscInterval)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) OnOpensipsStart(cdrDagram *osipsdagram.OsipsEvent) {
|
||||
close(*osm.evSubscribeStop) // Cancel previous subscribes
|
||||
evStop := make(chan struct{})
|
||||
osm.evSubscribeStop = &evStop
|
||||
go osm.SubscribeEvents(evStop)
|
||||
}
|
||||
|
||||
func (osm *OsipsSessionManager) OnCdr(cdrDagram *osipsdagram.OsipsEvent) {
|
||||
var reply string
|
||||
osipsEv, _ := NewOsipsEvent(cdrDagram)
|
||||
storedCdr := osipsEv.AsStoredCdr()
|
||||
if err := osm.cdrsrv.ProcessCdr(storedCdr, &reply); err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> Failed processing CDR, cgrid: %s, accid: %s, error: <%s>", storedCdr.CgrId, storedCdr.AccId, err.Error()))
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
Real-time Charging System for Telecom & ISP environments
|
||||
Copyright (C) 2012-2014 ITsysCOM GmbH
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@@ -21,12 +21,11 @@ package sessionmanager
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
type SessionManager interface {
|
||||
Connect(*config.CGRConfig) error
|
||||
Connect() error
|
||||
DisconnectSession(string, string)
|
||||
RemoveSession(string)
|
||||
MaxDebit(*engine.CallDescriptor, *engine.CallCost) error
|
||||
|
||||
8
test.sh
8
test.sh
@@ -4,10 +4,8 @@ go test -i github.com/cgrates/cgrates/engine
|
||||
go test -i github.com/cgrates/cgrates/sessionmanager
|
||||
go test -i github.com/cgrates/cgrates/config
|
||||
go test -i github.com/cgrates/cgrates/cmd/cgr-engine
|
||||
go test -i github.com/cgrates/cgrates/mediator
|
||||
go test -i github.com/cgrates/fsock
|
||||
go test -i github.com/cgrates/cgrates/cache2go
|
||||
go test -i github.com/cgrates/cgrates/cdrs
|
||||
go test -i github.com/cgrates/cgrates/cdrc
|
||||
go test -i github.com/cgrates/cgrates/utils
|
||||
go test -i github.com/cgrates/cgrates/history
|
||||
@@ -23,10 +21,6 @@ go test github.com/cgrates/cgrates/config
|
||||
cfg=$?
|
||||
go test github.com/cgrates/cgrates/cmd/cgr-engine
|
||||
cr=$?
|
||||
go test github.com/cgrates/cgrates/mediator
|
||||
md=$?
|
||||
go test github.com/cgrates/cgrates/cdrs
|
||||
cdrs=$?
|
||||
go test github.com/cgrates/cgrates/cdrc
|
||||
cdrcs=$?
|
||||
go test github.com/cgrates/cgrates/utils
|
||||
@@ -41,4 +35,4 @@ go test github.com/cgrates/cgrates/cdre
|
||||
cdre=$?
|
||||
|
||||
|
||||
exit $en && $gt && $sm && $cfg && $bl && $cr && $md && $cdrs && $cdrc && $fs && $ut && $hs && $c2g && $cdre
|
||||
exit $en && $gt && $sm && $cfg && $bl && $cr && $cdrc && $fs && $ut && $hs && $c2g && $cdre
|
||||
|
||||
281
utils/consts.go
281
utils/consts.go
@@ -1,134 +1,159 @@
|
||||
package utils
|
||||
|
||||
const (
|
||||
VERSION = "0.9.1rc5"
|
||||
POSTGRES = "postgres"
|
||||
MYSQL = "mysql"
|
||||
MONGO = "mongo"
|
||||
REDIS = "redis"
|
||||
LOCALHOST = "127.0.0.1"
|
||||
FSCDR_FILE_CSV = "freeswitch_file_csv"
|
||||
FSCDR_HTTP_JSON = "freeswitch_http_json"
|
||||
NOT_IMPLEMENTED = "not implemented"
|
||||
PREPAID = "prepaid"
|
||||
POSTPAID = "postpaid"
|
||||
PSEUDOPREPAID = "pseudoprepaid"
|
||||
RATED = "rated"
|
||||
ERR_NOT_IMPLEMENTED = "NOT_IMPLEMENTED"
|
||||
ERR_SERVER_ERROR = "SERVER_ERROR"
|
||||
ERR_NOT_FOUND = "NOT_FOUND"
|
||||
ERR_MANDATORY_IE_MISSING = "MANDATORY_IE_MISSING"
|
||||
ERR_EXISTS = "EXISTS"
|
||||
ERR_BROKEN_REFERENCE = "BROKEN_REFERENCE"
|
||||
ERR_PARSER_ERROR = "PARSER_ERROR"
|
||||
TBL_TP_TIMINGS = "tp_timings"
|
||||
TBL_TP_DESTINATIONS = "tp_destinations"
|
||||
TBL_TP_RATES = "tp_rates"
|
||||
TBL_TP_DESTINATION_RATES = "tp_destination_rates"
|
||||
TBL_TP_RATING_PLANS = "tp_rating_plans"
|
||||
TBL_TP_RATE_PROFILES = "tp_rating_profiles"
|
||||
TBL_TP_SHARED_GROUPS = "tp_shared_groups"
|
||||
TBL_TP_LCRS = "tp_lcr_rules"
|
||||
TBL_TP_ACTIONS = "tp_actions"
|
||||
TBL_TP_ACTION_PLANS = "tp_action_plans"
|
||||
TBL_TP_ACTION_TRIGGERS = "tp_action_triggers"
|
||||
TBL_TP_ACCOUNT_ACTIONS = "tp_account_actions"
|
||||
TBL_CDRS_PRIMARY = "cdrs_primary"
|
||||
TBL_CDRS_EXTRA = "cdrs_extra"
|
||||
TBL_COST_DETAILS = "cost_details"
|
||||
TBL_RATED_CDRS = "rated_cdrs"
|
||||
TIMINGS_CSV = "Timings.csv"
|
||||
DESTINATIONS_CSV = "Destinations.csv"
|
||||
RATES_CSV = "Rates.csv"
|
||||
DESTINATION_RATES_CSV = "DestinationRates.csv"
|
||||
RATING_PLANS_CSV = "RatingPlans.csv"
|
||||
RATING_PROFILES_CSV = "RatingProfiles.csv"
|
||||
SHARED_GROUPS_CSV = "SharedGroups.csv"
|
||||
LCRS_CSV = "LCRRules.csv"
|
||||
ACTIONS_CSV = "Actions.csv"
|
||||
ACTION_PLANS_CSV = "ActionPlans.csv"
|
||||
ACTION_TRIGGERS_CSV = "ActionTriggers.csv"
|
||||
ACCOUNT_ACTIONS_CSV = "AccountActions.csv"
|
||||
DERIVED_CHARGERS_CSV = "DerivedChargers.csv"
|
||||
TIMINGS_NRCOLS = 6
|
||||
DESTINATIONS_NRCOLS = 2
|
||||
RATES_NRCOLS = 6
|
||||
DESTINATION_RATES_NRCOLS = 5
|
||||
DESTRATE_TIMINGS_NRCOLS = 4
|
||||
RATE_PROFILES_NRCOLS = 7
|
||||
SHARED_GROUPS_NRCOLS = 4
|
||||
LCRS_NRCOLS = 9
|
||||
ACTIONS_NRCOLS = 12
|
||||
ACTION_PLANS_NRCOLS = 4
|
||||
ACTION_TRIGGERS_NRCOLS = 9
|
||||
ACCOUNT_ACTIONS_NRCOLS = 5
|
||||
DERIVED_CHARGERS_NRCOLS = 17
|
||||
ROUNDING_UP = "*up"
|
||||
ROUNDING_MIDDLE = "*middle"
|
||||
ROUNDING_DOWN = "*down"
|
||||
ANY = "*any"
|
||||
COMMENT_CHAR = '#'
|
||||
CSV_SEP = ','
|
||||
FALLBACK_SEP = ';'
|
||||
INFIELD_SEP = ";"
|
||||
FIELDS_SEP = ","
|
||||
REGEXP_PREFIX = "~"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
MSGPACK = "msgpack"
|
||||
CSV_LOAD = "CSVLOAD"
|
||||
CGRID = "cgrid"
|
||||
ORDERID = "orderid"
|
||||
ACCID = "accid"
|
||||
CDRHOST = "cdrhost"
|
||||
CDRSOURCE = "cdrsource"
|
||||
REQTYPE = "reqtype"
|
||||
DIRECTION = "direction"
|
||||
TENANT = "tenant"
|
||||
CATEGORY = "category"
|
||||
ACCOUNT = "account"
|
||||
SUBJECT = "subject"
|
||||
DESTINATION = "destination"
|
||||
SETUP_TIME = "setup_time"
|
||||
ANSWER_TIME = "answer_time"
|
||||
USAGE = "usage"
|
||||
MEDI_RUNID = "mediation_runid"
|
||||
RATED_ACCOUNT = "rated_account"
|
||||
RATED_SUBJECT = "rated_subject"
|
||||
COST = "cost"
|
||||
DEFAULT_RUNID = "default"
|
||||
STATIC_VALUE_PREFIX = "^"
|
||||
CSV = "csv"
|
||||
CDRE_DRYRUN = "dry_run"
|
||||
INTERNAL = "internal"
|
||||
ZERO_RATING_SUBJECT_PREFIX = "*zero"
|
||||
OK = "OK"
|
||||
CDRE_FIXED_WIDTH = "fwv"
|
||||
XML_PROFILE_PREFIX = "*xml:"
|
||||
CDRE = "cdre"
|
||||
CDRC = "cdrc"
|
||||
MASK_CHAR = "*"
|
||||
CONCATENATED_KEY_SEP = ":"
|
||||
META_DEFAULT = "*default"
|
||||
FORKED_CDR = "forked_cdr"
|
||||
UNIT_TEST = "UNIT_TEST"
|
||||
HDR_VAL_SEP = "/"
|
||||
MONETARY = "*monetary"
|
||||
SMS = "*sms"
|
||||
DATA = "*data"
|
||||
VOICE = "*voice"
|
||||
TOR = "tor"
|
||||
HOURS = "hours"
|
||||
MINUTES = "minutes"
|
||||
NANOSECONDS = "nanoseconds"
|
||||
SECONDS = "seconds"
|
||||
OUT = "*out"
|
||||
CDR_IMPORT = "cdr_import"
|
||||
CDR_EXPORT = "cdr_export"
|
||||
CDRFIELD = "cdrfield"
|
||||
ASR = "ASR"
|
||||
ACD = "ACD"
|
||||
FILTER_REGEXP_TPL = "$1$2$3$4$5"
|
||||
VERSION = "0.9.1rc5"
|
||||
POSTGRES = "postgres"
|
||||
MYSQL = "mysql"
|
||||
MONGO = "mongo"
|
||||
REDIS = "redis"
|
||||
LOCALHOST = "127.0.0.1"
|
||||
FSCDR_FILE_CSV = "freeswitch_file_csv"
|
||||
FSCDR_HTTP_JSON = "freeswitch_http_json"
|
||||
NOT_IMPLEMENTED = "not implemented"
|
||||
PREPAID = "prepaid"
|
||||
POSTPAID = "postpaid"
|
||||
PSEUDOPREPAID = "pseudoprepaid"
|
||||
RATED = "rated"
|
||||
ERR_NOT_IMPLEMENTED = "NOT_IMPLEMENTED"
|
||||
ERR_SERVER_ERROR = "SERVER_ERROR"
|
||||
ERR_NOT_FOUND = "NOT_FOUND"
|
||||
ERR_MANDATORY_IE_MISSING = "MANDATORY_IE_MISSING"
|
||||
ERR_EXISTS = "EXISTS"
|
||||
ERR_BROKEN_REFERENCE = "BROKEN_REFERENCE"
|
||||
ERR_PARSER_ERROR = "PARSER_ERROR"
|
||||
TBL_TP_TIMINGS = "tp_timings"
|
||||
TBL_TP_DESTINATIONS = "tp_destinations"
|
||||
TBL_TP_RATES = "tp_rates"
|
||||
TBL_TP_DESTINATION_RATES = "tp_destination_rates"
|
||||
TBL_TP_RATING_PLANS = "tp_rating_plans"
|
||||
TBL_TP_RATE_PROFILES = "tp_rating_profiles"
|
||||
TBL_TP_SHARED_GROUPS = "tp_shared_groups"
|
||||
TBL_TP_LCRS = "tp_lcr_rules"
|
||||
TBL_TP_ACTIONS = "tp_actions"
|
||||
TBL_TP_ACTION_PLANS = "tp_action_plans"
|
||||
TBL_TP_ACTION_TRIGGERS = "tp_action_triggers"
|
||||
TBL_TP_ACCOUNT_ACTIONS = "tp_account_actions"
|
||||
TBL_CDRS_PRIMARY = "cdrs_primary"
|
||||
TBL_CDRS_EXTRA = "cdrs_extra"
|
||||
TBL_COST_DETAILS = "cost_details"
|
||||
TBL_RATED_CDRS = "rated_cdrs"
|
||||
TIMINGS_CSV = "Timings.csv"
|
||||
DESTINATIONS_CSV = "Destinations.csv"
|
||||
RATES_CSV = "Rates.csv"
|
||||
DESTINATION_RATES_CSV = "DestinationRates.csv"
|
||||
RATING_PLANS_CSV = "RatingPlans.csv"
|
||||
RATING_PROFILES_CSV = "RatingProfiles.csv"
|
||||
SHARED_GROUPS_CSV = "SharedGroups.csv"
|
||||
LCRS_CSV = "LCRRules.csv"
|
||||
ACTIONS_CSV = "Actions.csv"
|
||||
ACTION_PLANS_CSV = "ActionPlans.csv"
|
||||
ACTION_TRIGGERS_CSV = "ActionTriggers.csv"
|
||||
ACCOUNT_ACTIONS_CSV = "AccountActions.csv"
|
||||
DERIVED_CHARGERS_CSV = "DerivedChargers.csv"
|
||||
TIMINGS_NRCOLS = 6
|
||||
DESTINATIONS_NRCOLS = 2
|
||||
RATES_NRCOLS = 6
|
||||
DESTINATION_RATES_NRCOLS = 5
|
||||
DESTRATE_TIMINGS_NRCOLS = 4
|
||||
RATE_PROFILES_NRCOLS = 7
|
||||
SHARED_GROUPS_NRCOLS = 4
|
||||
LCRS_NRCOLS = 9
|
||||
ACTIONS_NRCOLS = 12
|
||||
ACTION_PLANS_NRCOLS = 4
|
||||
ACTION_TRIGGERS_NRCOLS = 9
|
||||
ACCOUNT_ACTIONS_NRCOLS = 5
|
||||
DERIVED_CHARGERS_NRCOLS = 17
|
||||
ROUNDING_UP = "*up"
|
||||
ROUNDING_MIDDLE = "*middle"
|
||||
ROUNDING_DOWN = "*down"
|
||||
ANY = "*any"
|
||||
COMMENT_CHAR = '#'
|
||||
CSV_SEP = ','
|
||||
FALLBACK_SEP = ';'
|
||||
INFIELD_SEP = ";"
|
||||
FIELDS_SEP = ","
|
||||
REGEXP_PREFIX = "~"
|
||||
JSON = "json"
|
||||
GOB = "gob"
|
||||
MSGPACK = "msgpack"
|
||||
CSV_LOAD = "CSVLOAD"
|
||||
CGRID = "cgrid"
|
||||
ORDERID = "orderid"
|
||||
ACCID = "accid"
|
||||
CDRHOST = "cdrhost"
|
||||
CDRSOURCE = "cdrsource"
|
||||
REQTYPE = "reqtype"
|
||||
DIRECTION = "direction"
|
||||
TENANT = "tenant"
|
||||
CATEGORY = "category"
|
||||
ACCOUNT = "account"
|
||||
SUBJECT = "subject"
|
||||
DESTINATION = "destination"
|
||||
SETUP_TIME = "setup_time"
|
||||
ANSWER_TIME = "answer_time"
|
||||
USAGE = "usage"
|
||||
MEDI_RUNID = "mediation_runid"
|
||||
RATED_ACCOUNT = "rated_account"
|
||||
RATED_SUBJECT = "rated_subject"
|
||||
COST = "cost"
|
||||
DEFAULT_RUNID = "default"
|
||||
STATIC_VALUE_PREFIX = "^"
|
||||
CSV = "csv"
|
||||
CDRE_DRYRUN = "dry_run"
|
||||
INTERNAL = "internal"
|
||||
ZERO_RATING_SUBJECT_PREFIX = "*zero"
|
||||
OK = "OK"
|
||||
CDRE_FIXED_WIDTH = "fwv"
|
||||
XML_PROFILE_PREFIX = "*xml:"
|
||||
CDRE = "cdre"
|
||||
CDRC = "cdrc"
|
||||
MASK_CHAR = "*"
|
||||
CONCATENATED_KEY_SEP = ":"
|
||||
META_DEFAULT = "*default"
|
||||
FORKED_CDR = "forked_cdr"
|
||||
UNIT_TEST = "UNIT_TEST"
|
||||
HDR_VAL_SEP = "/"
|
||||
MONETARY = "*monetary"
|
||||
SMS = "*sms"
|
||||
DATA = "*data"
|
||||
VOICE = "*voice"
|
||||
TOR = "tor"
|
||||
HOURS = "hours"
|
||||
MINUTES = "minutes"
|
||||
NANOSECONDS = "nanoseconds"
|
||||
SECONDS = "seconds"
|
||||
OUT = "*out"
|
||||
CDR_IMPORT = "cdr_import"
|
||||
CDR_EXPORT = "cdr_export"
|
||||
CDRFIELD = "cdrfield"
|
||||
ASR = "ASR"
|
||||
ACD = "ACD"
|
||||
FILTER_REGEXP_TPL = "$1$2$3$4$5"
|
||||
ACTION_TIMING_PREFIX = "apl_"
|
||||
RATING_PLAN_PREFIX = "rpl_"
|
||||
RATING_PROFILE_PREFIX = "rpf_"
|
||||
RP_ALIAS_PREFIX = "ral_"
|
||||
ACC_ALIAS_PREFIX = "aal_"
|
||||
ACTION_PREFIX = "act_"
|
||||
SHARED_GROUP_PREFIX = "shg_"
|
||||
ACCOUNT_PREFIX = "ubl_"
|
||||
DESTINATION_PREFIX = "dst_"
|
||||
LCR_PREFIX = "lcr_"
|
||||
DERIVEDCHARGERS_PREFIX = "dcs_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
LOG_ACTION_TIMMING_PREFIX = "ltm_"
|
||||
LOG_ACTION_TRIGGER_PREFIX = "ltr_"
|
||||
LOG_ERR = "ler_"
|
||||
LOG_CDR = "cdr_"
|
||||
LOG_MEDIATED_CDR = "mcd_"
|
||||
SESSION_MANAGER_SOURCE = "SMR"
|
||||
MEDIATOR_SOURCE = "MED"
|
||||
SCHED_SOURCE = "SCH"
|
||||
RATER_SOURCE = "RAT"
|
||||
CREATE_CDRS_TABLES_SQL = "create_cdrs_tables.sql"
|
||||
CREATE_TARIFFPLAN_TABLES_SQL = "create_tariffplan_tables.sql"
|
||||
TEST_SQL = "TEST_SQL"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
Reference in New Issue
Block a user