From 6155b0ad357fb6494cb888376363317797085fe4 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 14:58:45 +0200 Subject: [PATCH 1/3] Removed unused methods from CDRServer --- engine/cdrs.go | 76 -------------------------------------------------- 1 file changed, 76 deletions(-) diff --git a/engine/cdrs.go b/engine/cdrs.go index 504d376e6..ebb6633fb 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -258,36 +258,6 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithArgDispatcher) (*CallCost, e return cc, nil } -// attrStoExpThdStat will process a CGREvent with the configured subsystems -func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREventWithArgDispatcher, - attrS, store, allowUpdate, export, thdS, statS bool) (err error) { - if attrS { - if err = cdrS.attrSProcessEvent(cgrEv); err != nil { - return - } - } - if thdS { - go cdrS.thdSProcessEvent(cgrEv) - } - if statS { - go cdrS.statSProcessEvent(cgrEv) - } - var cdr *CDR - if cdr, err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil { - return - } - if store { - if err = cdrS.cdrDb.SetCDR(cdr, allowUpdate); err != nil { - return - } - } - if export { - go cdrS.exportCDRs([]*CDR{cdr}) - } - return -} - // rateCDRWithErr rates a CDR including errors func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithArgDispatcher) (ratedCDRs []*CDR) { var err error @@ -321,52 +291,6 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (err return } -// chrgProcessEvent will process the CGREvent with ChargerS subsystem -// it is designed to run in it's own goroutine -func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREventWithArgDispatcher, - attrS, store, allowUpdate, export, thdS, statS bool) (err error) { - var chrgrs []*ChrgSProcessEventReply - if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ChargerSConns, nil, - utils.ChargerSv1ProcessEvent, - cgrEv, &chrgrs); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", - utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) - return - } - var partExec bool - for _, chrgr := range chrgrs { - cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s converting event: %+v as CDR", - utils.CDRs, err.Error(), cgrEv)) - partExec = true - continue - } - for _, rtCDR := range cdrS.rateCDRWithErr( - &CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: cgrEv.ArgDispatcher}) { - arg := &utils.CGREventWithArgDispatcher{ - CGREvent: rtCDR.AsCGREvent(), - ArgDispatcher: cgrEv.ArgDispatcher, - } - if errProc := cdrS.attrStoExpThdStat(arg, - attrS, store, allowUpdate, export, thdS, statS); errProc != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s", - utils.CDRs, errProc.Error(), cgrEv, utils.ChargerS)) - partExec = true - continue - } - } - } - if partExec { - err = utils.ErrPartiallyExecuted - } - return -} - // chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) { var chrgrs []*ChrgSProcessEventReply From edea07e044433f216d8c79b4cf9768cdfdcc4226 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 15:57:38 +0200 Subject: [PATCH 2/3] Updated AttributeS Context --- engine/cdrs.go | 4 +++- engine/chargers.go | 6 ++++-- engine/suppliers.go | 4 +++- sessions/sessions.go | 4 +++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/engine/cdrs.go b/engine/cdrs.go index ebb6633fb..9251e0f66 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -316,7 +316,9 @@ func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { var rplyEv AttrSProcessEventReply attrArgs := &AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaCDRs), + Context: utils.StringPointer(utils.FirstNonEmpty( + utils.IfaceAsString(cgrEv.CGREvent.Event[utils.Context]), + utils.MetaCDRs)), CGREvent: cgrEv.CGREvent} if cgrEv.ArgDispatcher != nil { attrArgs.ArgDispatcher = cgrEv.ArgDispatcher diff --git a/engine/chargers.go b/engine/chargers.go index 15496bde4..b97d6c513 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -122,8 +122,10 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithArgDispatcher) ( } args := &AttrArgsProcessEvent{ - AttributeIDs: cP.AttributeIDs, - Context: utils.StringPointer(utils.MetaChargers), + AttributeIDs: cP.AttributeIDs, + Context: utils.StringPointer(utils.FirstNonEmpty( + utils.IfaceAsString(clonedEv.CGREvent.Event[utils.Context]), + utils.MetaChargers)), ProcessRuns: nil, CGREvent: clonedEv.CGREvent, ArgDispatcher: clonedEv.ArgDispatcher, diff --git a/engine/suppliers.go b/engine/suppliers.go index e72579a16..abd13dba9 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -579,7 +579,9 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted } if len(spS.cgrcfg.SupplierSCfg().AttributeSConns) != 0 { attrArgs := &AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSuppliers), + Context: utils.StringPointer(utils.FirstNonEmpty( + utils.IfaceAsString(args.CGREvent.Event[utils.Context]), + utils.MetaSuppliers)), CGREvent: args.CGREvent, ArgDispatcher: args.ArgDispatcher, } diff --git a/sessions/sessions.go b/sessions/sessions.go index 8f63d0057..1dd092ca5 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -3386,7 +3386,9 @@ func (sS *SessionS) processAttributes(cgrEv *utils.CGREvent, argDisp *utils.ArgD return rplyEv, utils.NewErrNotConnected(utils.AttributeS) } attrArgs := &engine.AttrArgsProcessEvent{ - Context: utils.StringPointer(utils.MetaSessionS), + Context: utils.StringPointer(utils.FirstNonEmpty( + utils.IfaceAsString(cgrEv.Event[utils.Context]), + utils.MetaSessionS)), CGREvent: cgrEv, ArgDispatcher: argDisp, } From 546404f4f271ac063a296900a85608d09efdbaaf Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Dec 2019 17:57:19 +0200 Subject: [PATCH 3/3] Updated ers unit tests --- ers/kafka_test.go | 75 +++++++++++++++++++++++++++++ ers/sql.go | 4 +- ers/sql_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 ers/kafka_test.go create mode 100644 ers/sql_test.go diff --git a/ers/kafka_test.go b/ers/kafka_test.go new file mode 100644 index 000000000..0eb0301ec --- /dev/null +++ b/ers/kafka_test.go @@ -0,0 +1,75 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "testing" + "time" +) + +func TestKafkaSetURL(t *testing.T) { + k := new(KafkaER) + expKafka := &KafkaER{ + dialURL: "localhost:2013", + topic: "cdrs", + groupID: "new", + maxWait: time.Second, + } + url := "localhost:2013?topic=cdrs&group_id=new&max_wait=1s" + if err := k.setURL(url); err != nil { + t.Fatal(err) + } else if expKafka.dialURL != k.dialURL { + t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) + } else if expKafka.topic != k.topic { + t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic) + } else if expKafka.groupID != k.groupID { + t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID) + } else if expKafka.maxWait != k.maxWait { + t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait) + } + k = new(KafkaER) + expKafka = &KafkaER{ + dialURL: "localhost:2013", + topic: "cgrates", + groupID: "cgrates", + maxWait: time.Millisecond, + } + url = "localhost:2013" + if err := k.setURL(url); err != nil { + t.Fatal(err) + } else if expKafka.dialURL != k.dialURL { + t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) + } else if expKafka.topic != k.topic { + t.Errorf("Expected: %s ,received: %s", expKafka.topic, k.topic) + } else if expKafka.groupID != k.groupID { + t.Errorf("Expected: %s ,received: %s", expKafka.groupID, k.groupID) + } else if expKafka.maxWait != k.maxWait { + t.Errorf("Expected: %s ,received: %s", expKafka.maxWait, k.maxWait) + } + k = new(KafkaER) + expKafka = &KafkaER{ + dialURL: "localhost:2013", + topic: "cgrates", + groupID: "cgrates", + maxWait: time.Millisecond, + } + if err := k.setURL(":"); err == nil { + t.Errorf("Expected error received: %v", err) + } +} diff --git a/ers/sql.go b/ers/sql.go index f6b4a88ce..261188169 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -260,7 +260,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { } else { outURL = strings.TrimPrefix(outURL, utils.Meta) var oURL *url.URL - if oURL, err = url.Parse(inURL); err != nil { + if oURL, err = url.Parse(outURL); err != nil { return } rdr.expConnType = oURL.Scheme @@ -292,7 +292,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) { rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", outHost, outPort, outDBname, outUser, outPassword, outSSL) default: - return fmt.Errorf("unknown db_type") + return fmt.Errorf("unknown db_type %s", rdr.expConnType) } return } diff --git a/ers/sql_test.go b/ers/sql_test.go new file mode 100644 index 000000000..ec9f41513 --- /dev/null +++ b/ers/sql_test.go @@ -0,0 +1,117 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "testing" +) + +func TestSQLSetURL(t *testing.T) { + sql := new(SQLEventReader) + expsql := &SQLEventReader{ + connString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + connType: "mysql", + tableName: "cdrs2", + expConnString: "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + expConnType: "mysql", + expTableName: "cdrs2", + } + inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + if err := sql.setURL(inURL, outURL); err != nil { + t.Fatal(err) + } else if expsql.connString != sql.connString { + t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString) + } else if expsql.connType != sql.connType { + t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType) + } else if expsql.tableName != sql.tableName { + t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName) + } else if expsql.expConnString != sql.expConnString { + t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString) + } else if expsql.expConnType != sql.expConnType { + t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType) + } else if expsql.expTableName != sql.expTableName { + t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName) + } + + sql = new(SQLEventReader) + expsql = &SQLEventReader{ + connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled", + connType: "postgres", + tableName: "cdrs2", + expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled", + expConnType: "postgres", + expTableName: "cdrs2", + } + inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + if err := sql.setURL(inURL, outURL); err != nil { + t.Fatal(err) + } else if expsql.connString != sql.connString { + t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString) + } else if expsql.connType != sql.connType { + t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType) + } else if expsql.tableName != sql.tableName { + t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName) + } else if expsql.expConnString != sql.expConnString { + t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString) + } else if expsql.expConnType != sql.expConnType { + t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType) + } else if expsql.expTableName != sql.expTableName { + t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName) + } + + sql = new(SQLEventReader) + expsql = &SQLEventReader{ + connString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled", + connType: "postgres", + tableName: "cdrs2", + expConnString: "host=127.0.0.1 port=3306 dbname=cgrates2 user=cgrates password=CGRateS.org sslmode=enabled", + expConnType: "postgres", + expTableName: "cdrs2", + } + inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + outURL = "db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + if err := sql.setURL(inURL, outURL); err != nil { + t.Fatal(err) + } else if expsql.connString != sql.connString { + t.Errorf("Expected: %q ,received: %q", expsql.connString, sql.connString) + } else if expsql.connType != sql.connType { + t.Errorf("Expected: %q ,received: %q", expsql.connType, sql.connType) + } else if expsql.tableName != sql.tableName { + t.Errorf("Expected: %q ,received: %q", expsql.tableName, sql.tableName) + } else if expsql.expConnString != sql.expConnString { + t.Errorf("Expected: %q ,received: %q", expsql.expConnString, sql.expConnString) + } else if expsql.expConnType != sql.expConnType { + t.Errorf("Expected: %q ,received: %q", expsql.expConnType, sql.expConnType) + } else if expsql.expTableName != sql.expTableName { + t.Errorf("Expected: %q ,received: %q", expsql.expTableName, sql.expTableName) + } + + inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + outURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" { + t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err) + } + inURL = "*postgres2://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2&table_name=cdrs2&sslmode=enabled" + if err := sql.setURL(inURL, outURL); err == nil || err.Error() != "unknown db_type postgres2" { + t.Errorf("Expected error: 'unknown db_type postgres2' ,received: %v", err) + } +}