mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
@@ -258,36 +258,6 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, e
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// attrStoExpThdStat will process a CGREvent with the configured subsystems
|
||||
func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher,
|
||||
attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
|
||||
if attrS {
|
||||
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if thdS {
|
||||
go cdrS.thdSProcessEvent(cgrEv)
|
||||
}
|
||||
if statS {
|
||||
go cdrS.statSProcessEvent(cgrEv)
|
||||
}
|
||||
var cdr *CDR
|
||||
if cdr, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
|
||||
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
if store {
|
||||
if err = cdrS.cdrDb.SetCDR(cdr, allowUpdate); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if export {
|
||||
go cdrS.exportCDRs([]*CDR{cdr})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// rateCDRWithErr rates a CDR including errors
|
||||
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) {
|
||||
var err error
|
||||
@@ -321,52 +291,6 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err
|
||||
return
|
||||
}
|
||||
|
||||
// chrgProcessEvent will process the CGREvent with ChargerS subsystem
|
||||
// it is designed to run in it's own goroutine
|
||||
func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
|
||||
attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
|
||||
var chrgrs []*ChrgSProcessEventReply
|
||||
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ChargerSConns, nil,
|
||||
utils.ChargerSv1ProcessEvent,
|
||||
cgrEv, &chrgrs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
|
||||
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
|
||||
return
|
||||
}
|
||||
var partExec bool
|
||||
for _, chrgr := range chrgrs {
|
||||
cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg,
|
||||
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s converting event: %+v as CDR",
|
||||
utils.CDRs, err.Error(), cgrEv))
|
||||
partExec = true
|
||||
continue
|
||||
}
|
||||
for _, rtCDR := range cdrS.rateCDRWithErr(
|
||||
&CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) {
|
||||
arg := &utils.CGREventWithArgDispatcher{
|
||||
CGREvent: rtCDR.AsCGREvent(),
|
||||
ArgDispatcher: cgrEv.ArgDispatcher,
|
||||
}
|
||||
if errProc := cdrS.attrStoExpThdStat(arg,
|
||||
attrS, store, allowUpdate, export, thdS, statS); errProc != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s",
|
||||
utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS))
|
||||
partExec = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
if partExec {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles
|
||||
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) {
|
||||
var chrgrs []*ChrgSProcessEventReply
|
||||
@@ -392,7 +316,9 @@ func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher
|
||||
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
|
||||
var rplyEv AttrSProcessEventReply
|
||||
attrArgs := &AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer(utils.MetaCDRs),
|
||||
Context: utils.StringPointer(utils.FirstNonEmpty(
|
||||
utils.IfaceAsString(cgrEv.CGREvent.Event[utils.Context]),
|
||||
utils.MetaCDRs)),
|
||||
CGREvent: cgrEv.CGREvent}
|
||||
if cgrEv.ArgDispatcher != nil {
|
||||
attrArgs.ArgDispatcher = cgrEv.ArgDispatcher
|
||||
|
||||
@@ -122,8 +122,10 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithArgDispatcher) (
|
||||
}
|
||||
|
||||
args := &AttrArgsProcessEvent{
|
||||
AttributeIDs: cP.AttributeIDs,
|
||||
Context: utils.StringPointer(utils.MetaChargers),
|
||||
AttributeIDs: cP.AttributeIDs,
|
||||
Context: utils.StringPointer(utils.FirstNonEmpty(
|
||||
utils.IfaceAsString(clonedEv.CGREvent.Event[utils.Context]),
|
||||
utils.MetaChargers)),
|
||||
ProcessRuns: nil,
|
||||
CGREvent: clonedEv.CGREvent,
|
||||
ArgDispatcher: clonedEv.ArgDispatcher,
|
||||
|
||||
@@ -579,7 +579,9 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted
|
||||
}
|
||||
if len(spS.cgrcfg.SupplierSCfg().AttributeSConns) != 0 {
|
||||
attrArgs := &AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer(utils.MetaSuppliers),
|
||||
Context: utils.StringPointer(utils.FirstNonEmpty(
|
||||
utils.IfaceAsString(args.CGREvent.Event[utils.Context]),
|
||||
utils.MetaSuppliers)),
|
||||
CGREvent: args.CGREvent,
|
||||
ArgDispatcher: args.ArgDispatcher,
|
||||
}
|
||||
|
||||
75
ers/kafka_test.go
Normal file
75
ers/kafka_test.go
Normal file
@@ -0,0 +1,75 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestKafkaSetURL(t *testing.T) {
|
||||
k := new(KafkaER)
|
||||
expKafka := &KafkaER{
|
||||
dialURL: "localhost:2013",
|
||||
topic: "cdrs",
|
||||
groupID: "new",
|
||||
maxWait: time.Second,
|
||||
}
|
||||
url := "localhost:2013?topic=cdrs&group_id=new&max_wait=1s"
|
||||
if err := k.setURL(url); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if expKafka.dialURL != k.dialURL {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
|
||||
} else if expKafka.topic != k.topic {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic)
|
||||
} else if expKafka.groupID != k.groupID {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID)
|
||||
} else if expKafka.maxWait != k.maxWait {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
|
||||
}
|
||||
k = new(KafkaER)
|
||||
expKafka = &KafkaER{
|
||||
dialURL: "localhost:2013",
|
||||
topic: "cgrates",
|
||||
groupID: "cgrates",
|
||||
maxWait: time.Millisecond,
|
||||
}
|
||||
url = "localhost:2013"
|
||||
if err := k.setURL(url); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if expKafka.dialURL != k.dialURL {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
|
||||
} else if expKafka.topic != k.topic {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic)
|
||||
} else if expKafka.groupID != k.groupID {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID)
|
||||
} else if expKafka.maxWait != k.maxWait {
|
||||
t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
|
||||
}
|
||||
k = new(KafkaER)
|
||||
expKafka = &KafkaER{
|
||||
dialURL: "localhost:2013",
|
||||
topic: "cgrates",
|
||||
groupID: "cgrates",
|
||||
maxWait: time.Millisecond,
|
||||
}
|
||||
if err := k.setURL(":"); err == nil {
|
||||
t.Errorf("Expected error received: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -260,7 +260,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
|
||||
} else {
|
||||
outURL = strings.TrimPrefix(outURL, utils.Meta)
|
||||
var oURL *url.URL
|
||||
if oURL, err = url.Parse(inURL); err != nil {
|
||||
if oURL, err = url.Parse(outURL); err != nil {
|
||||
return
|
||||
}
|
||||
rdr.expConnType = oURL.Scheme
|
||||
@@ -292,7 +292,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
|
||||
rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
|
||||
outHost, outPort, outDBname, outUser, outPassword, outSSL)
|
||||
default:
|
||||
return fmt.Errorf("unknown db_type")
|
||||
return fmt.Errorf("unknown db_type %s", rdr.expConnType)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
117
ers/sql_test.go
Normal file
117
ers/sql_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSQLSetURL(t *testing.T) {
|
||||
sql := new(SQLEventReader)
|
||||
expsql := &SQLEventReader{
|
||||
connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
|
||||
connType: "mysql",
|
||||
tableName: "cdrs2",
|
||||
expConnString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
|
||||
expConnType: "mysql",
|
||||
expTableName: "cdrs2",
|
||||
}
|
||||
inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
if err := sql.setURL(inURL, outURL); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if expsql.connString != sql.connString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
|
||||
} else if expsql.connType != sql.connType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
|
||||
} else if expsql.tableName != sql.tableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
|
||||
} else if expsql.expConnString != sql.expConnString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
|
||||
} else if expsql.expConnType != sql.expConnType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
|
||||
} else if expsql.expTableName != sql.expTableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
|
||||
}
|
||||
|
||||
sql = new(SQLEventReader)
|
||||
expsql = &SQLEventReader{
|
||||
connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
|
||||
connType: "postgres",
|
||||
tableName: "cdrs2",
|
||||
expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
|
||||
expConnType: "postgres",
|
||||
expTableName: "cdrs2",
|
||||
}
|
||||
inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
if err := sql.setURL(inURL, outURL); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if expsql.connString != sql.connString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
|
||||
} else if expsql.connType != sql.connType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
|
||||
} else if expsql.tableName != sql.tableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
|
||||
} else if expsql.expConnString != sql.expConnString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
|
||||
} else if expsql.expConnType != sql.expConnType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
|
||||
} else if expsql.expTableName != sql.expTableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
|
||||
}
|
||||
|
||||
sql = new(SQLEventReader)
|
||||
expsql = &SQLEventReader{
|
||||
connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
|
||||
connType: "postgres",
|
||||
tableName: "cdrs2",
|
||||
expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
|
||||
expConnType: "postgres",
|
||||
expTableName: "cdrs2",
|
||||
}
|
||||
inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
outURL = "db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
if err := sql.setURL(inURL, outURL); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if expsql.connString != sql.connString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
|
||||
} else if expsql.connType != sql.connType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
|
||||
} else if expsql.tableName != sql.tableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
|
||||
} else if expsql.expConnString != sql.expConnString {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
|
||||
} else if expsql.expConnType != sql.expConnType {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
|
||||
} else if expsql.expTableName != sql.expTableName {
|
||||
t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
|
||||
}
|
||||
|
||||
inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
outURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" {
|
||||
t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err)
|
||||
}
|
||||
inURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
|
||||
if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" {
|
||||
t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -3386,7 +3386,9 @@ func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, argDisp *utils.ArgD
|
||||
return rplyEv, utils.NewErrNotConnected(utils.AttributeS)
|
||||
}
|
||||
attrArgs := &engine.AttrArgsProcessEvent{
|
||||
Context: utils.StringPointer(utils.MetaSessionS),
|
||||
Context: utils.StringPointer(utils.FirstNonEmpty(
|
||||
utils.IfaceAsString(cgrEv.Event[utils.Context]),
|
||||
utils.MetaSessionS)),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: argDisp,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user