diff --git a/engine/cdrs.go b/engine/cdrs.go
index 504d376e6..9251e0f66 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -258,36 +258,6 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, e
return cc, nil
}
-// attrStoExpThdStat will process a CGREvent with the configured subsystems
-func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher,
- attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
- if attrS {
- if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
- return
- }
- }
- if thdS {
- go cdrS.thdSProcessEvent(cgrEv)
- }
- if statS {
- go cdrS.statSProcessEvent(cgrEv)
- }
- var cdr *CDR
- if cdr, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
- cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
- return
- }
- if store {
- if err = cdrS.cdrDb.SetCDR(cdr, allowUpdate); err != nil {
- return
- }
- }
- if export {
- go cdrS.exportCDRs([]*CDR{cdr})
- }
- return
-}
-
// rateCDRWithErr rates a CDR including errors
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) {
var err error
@@ -321,52 +291,6 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err
return
}
-// chrgProcessEvent will process the CGREvent with ChargerS subsystem
-// it is designed to run in it's own goroutine
-func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher,
- attrS, store, allowUpdate, export, thdS, statS bool) (err error) {
- var chrgrs []*ChrgSProcessEventReply
- if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ChargerSConns, nil,
- utils.ChargerSv1ProcessEvent,
- cgrEv, &chrgrs); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
- utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
- return
- }
- var partExec bool
- for _, chrgr := range chrgrs {
- cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg,
- cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
- if err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s converting event: %+v as CDR",
- utils.CDRs, err.Error(), cgrEv))
- partExec = true
- continue
- }
- for _, rtCDR := range cdrS.rateCDRWithErr(
- &CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) {
- arg := &utils.CGREventWithArgDispatcher{
- CGREvent: rtCDR.AsCGREvent(),
- ArgDispatcher: cgrEv.ArgDispatcher,
- }
- if errProc := cdrS.attrStoExpThdStat(arg,
- attrS, store, allowUpdate, export, thdS, statS); errProc != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s",
- utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS))
- partExec = true
- continue
- }
- }
- }
- if partExec {
- err = utils.ErrPartiallyExecuted
- }
- return
-}
-
// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) {
var chrgrs []*ChrgSProcessEventReply
@@ -392,7 +316,9 @@ func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
var rplyEv AttrSProcessEventReply
attrArgs := &AttrArgsProcessEvent{
- Context: utils.StringPointer(utils.MetaCDRs),
+ Context: utils.StringPointer(utils.FirstNonEmpty(
+ utils.IfaceAsString(cgrEv.CGREvent.Event[utils.Context]),
+ utils.MetaCDRs)),
CGREvent: cgrEv.CGREvent}
if cgrEv.ArgDispatcher != nil {
attrArgs.ArgDispatcher = cgrEv.ArgDispatcher
diff --git a/engine/chargers.go b/engine/chargers.go
index 15496bde4..b97d6c513 100644
--- a/engine/chargers.go
+++ b/engine/chargers.go
@@ -122,8 +122,10 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithArgDispatcher) (
}
args := &AttrArgsProcessEvent{
- AttributeIDs: cP.AttributeIDs,
- Context: utils.StringPointer(utils.MetaChargers),
+ AttributeIDs: cP.AttributeIDs,
+ Context: utils.StringPointer(utils.FirstNonEmpty(
+ utils.IfaceAsString(clonedEv.CGREvent.Event[utils.Context]),
+ utils.MetaChargers)),
ProcessRuns: nil,
CGREvent: clonedEv.CGREvent,
ArgDispatcher: clonedEv.ArgDispatcher,
diff --git a/engine/suppliers.go b/engine/suppliers.go
index e72579a16..abd13dba9 100644
--- a/engine/suppliers.go
+++ b/engine/suppliers.go
@@ -579,7 +579,9 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted
}
if len(spS.cgrcfg.SupplierSCfg().AttributeSConns) != 0 {
attrArgs := &AttrArgsProcessEvent{
- Context: utils.StringPointer(utils.MetaSuppliers),
+ Context: utils.StringPointer(utils.FirstNonEmpty(
+ utils.IfaceAsString(args.CGREvent.Event[utils.Context]),
+ utils.MetaSuppliers)),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
diff --git a/ers/kafka_test.go b/ers/kafka_test.go
new file mode 100644
index 000000000..0eb0301ec
--- /dev/null
+++ b/ers/kafka_test.go
@@ -0,0 +1,75 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package ers
+
+import (
+ "testing"
+ "time"
+)
+
+func TestKafkaSetURL(t *testing.T) {
+ k := new(KafkaER)
+ expKafka := &KafkaER{
+ dialURL: "localhost:2013",
+ topic: "cdrs",
+ groupID: "new",
+ maxWait: time.Second,
+ }
+ url := "localhost:2013?topic=cdrs&group_id=new&max_wait=1s"
+ if err := k.setURL(url); err != nil {
+ t.Fatal(err)
+ } else if expKafka.dialURL != k.dialURL {
+ t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
+ } else if expKafka.topic != k.topic {
+ t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic)
+ } else if expKafka.groupID != k.groupID {
+ t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID)
+ } else if expKafka.maxWait != k.maxWait {
+ t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
+ }
+ k = new(KafkaER)
+ expKafka = &KafkaER{
+ dialURL: "localhost:2013",
+ topic: "cgrates",
+ groupID: "cgrates",
+ maxWait: time.Millisecond,
+ }
+ url = "localhost:2013"
+ if err := k.setURL(url); err != nil {
+ t.Fatal(err)
+ } else if expKafka.dialURL != k.dialURL {
+ t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL)
+ } else if expKafka.topic != k.topic {
+ t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic)
+ } else if expKafka.groupID != k.groupID {
+ t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID)
+ } else if expKafka.maxWait != k.maxWait {
+ t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait)
+ }
+ k = new(KafkaER)
+ expKafka = &KafkaER{
+ dialURL: "localhost:2013",
+ topic: "cgrates",
+ groupID: "cgrates",
+ maxWait: time.Millisecond,
+ }
+ if err := k.setURL(":"); err == nil {
+ t.Errorf("Expected error received: %v", err)
+ }
+}
diff --git a/ers/sql.go b/ers/sql.go
index f6b4a88ce..261188169 100644
--- a/ers/sql.go
+++ b/ers/sql.go
@@ -260,7 +260,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
} else {
outURL = strings.TrimPrefix(outURL, utils.Meta)
var oURL *url.URL
- if oURL, err = url.Parse(inURL); err != nil {
+ if oURL, err = url.Parse(outURL); err != nil {
return
}
rdr.expConnType = oURL.Scheme
@@ -292,7 +292,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
outHost, outPort, outDBname, outUser, outPassword, outSSL)
default:
- return fmt.Errorf("unknown db_type")
+ return fmt.Errorf("unknown db_type %s", rdr.expConnType)
}
return
}
diff --git a/ers/sql_test.go b/ers/sql_test.go
new file mode 100644
index 000000000..ec9f41513
--- /dev/null
+++ b/ers/sql_test.go
@@ -0,0 +1,117 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package ers
+
+import (
+ "testing"
+)
+
+func TestSQLSetURL(t *testing.T) {
+ sql := new(SQLEventReader)
+ expsql := &SQLEventReader{
+ connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
+ connType: "mysql",
+ tableName: "cdrs2",
+ expConnString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
+ expConnType: "mysql",
+ expTableName: "cdrs2",
+ }
+ inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ if err := sql.setURL(inURL, outURL); err != nil {
+ t.Fatal(err)
+ } else if expsql.connString != sql.connString {
+ t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
+ } else if expsql.connType != sql.connType {
+ t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
+ } else if expsql.tableName != sql.tableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
+ } else if expsql.expConnString != sql.expConnString {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
+ } else if expsql.expConnType != sql.expConnType {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
+ } else if expsql.expTableName != sql.expTableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
+ }
+
+ sql = new(SQLEventReader)
+ expsql = &SQLEventReader{
+ connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
+ connType: "postgres",
+ tableName: "cdrs2",
+ expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
+ expConnType: "postgres",
+ expTableName: "cdrs2",
+ }
+ inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ if err := sql.setURL(inURL, outURL); err != nil {
+ t.Fatal(err)
+ } else if expsql.connString != sql.connString {
+ t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
+ } else if expsql.connType != sql.connType {
+ t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
+ } else if expsql.tableName != sql.tableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
+ } else if expsql.expConnString != sql.expConnString {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
+ } else if expsql.expConnType != sql.expConnType {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
+ } else if expsql.expTableName != sql.expTableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
+ }
+
+ sql = new(SQLEventReader)
+ expsql = &SQLEventReader{
+ connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
+ connType: "postgres",
+ tableName: "cdrs2",
+ expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled",
+ expConnType: "postgres",
+ expTableName: "cdrs2",
+ }
+ inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ outURL = "db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ if err := sql.setURL(inURL, outURL); err != nil {
+ t.Fatal(err)
+ } else if expsql.connString != sql.connString {
+ t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString)
+ } else if expsql.connType != sql.connType {
+ t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType)
+ } else if expsql.tableName != sql.tableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName)
+ } else if expsql.expConnString != sql.expConnString {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString)
+ } else if expsql.expConnType != sql.expConnType {
+ t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType)
+ } else if expsql.expTableName != sql.expTableName {
+ t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName)
+ }
+
+ inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ outURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" {
+ t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err)
+ }
+ inURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled"
+ if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" {
+ t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err)
+ }
+}
diff --git a/sessions/sessions.go b/sessions/sessions.go
index 8f63d0057..1dd092ca5 100644
--- a/sessions/sessions.go
+++ b/sessions/sessions.go
@@ -3386,7 +3386,9 @@ func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, argDisp *utils.ArgD
return rplyEv, utils.NewErrNotConnected(utils.AttributeS)
}
attrArgs := &engine.AttrArgsProcessEvent{
- Context: utils.StringPointer(utils.MetaSessionS),
+ Context: utils.StringPointer(utils.FirstNonEmpty(
+ utils.IfaceAsString(cgrEv.Event[utils.Context]),
+ utils.MetaSessionS)),
CGREvent: cgrEv,
ArgDispatcher: argDisp,
}