From 1d67bca260366530519b81ef8b4262104d2641ad Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 1 Apr 2021 09:35:23 +0300 Subject: [PATCH] Updated tests after remove --- agents/astagent.go | 1 - agents/dnsagent.go | 1 - cmd/cgr-engine/cgr-engine.go | 14 ++----- console/command.go | 2 +- console/ping.go | 2 - data/tariffplans/testit/ActionProfiles.csv | 8 ---- .../tariffplans/tutactions/ActionProfiles.csv | 8 ---- dispatchers/libdispatcher.go | 3 -- ees/ees.go | 2 +- ees/ees_test.go | 4 ++ ees/elastic.go | 4 +- ees/filecsv.go | 1 - ees/filefwv.go | 1 - ees/httpjsonmap.go | 4 +- ees/httppost.go | 4 +- ees/posterjsonmap.go | 1 - ees/virtualee.go | 4 +- ers/amqpv1.go | 8 ++-- ers/filecsv.go | 2 +- ers/filefwv.go | 2 +- ers/filejson.go | 2 +- ers/filexml.go | 2 +- ers/flatstore.go | 2 +- ers/kafka.go | 17 ++++---- ers/partial_csv.go | 2 +- ers/s3.go | 1 + ers/s3_it_test.go | 2 + ers/sqs.go | 1 + ers/sqs_it_test.go | 1 + loaders/loader.go | 1 - migrator/accounts.go | 17 ++++---- migrator/action.go | 17 +------- migrator/action_plan.go | 3 -- migrator/action_trigger.go | 1 - migrator/attributes.go | 24 +---------- migrator/cdrs.go | 14 ------- migrator/derived_chargers.go | 12 ++---- migrator/filters.go | 14 ++----- migrator/migrator_utils.go | 42 ++++++++----------- migrator/stats.go | 2 - migrator/storage_map_datadb.go | 6 +-- migrator/storage_map_stordb.go | 6 +-- migrator/storage_sql.go | 3 +- migrator/thresholds.go | 2 - migrator/tp_shared_groups.go | 2 +- migrator/tp_timings.go | 2 +- registrarc/registrarc.go | 2 - sessions/sessions.go | 8 ---- structmatcher/structmatcher.go | 4 -- utils/consts.go | 8 ---- 50 files changed, 77 insertions(+), 219 deletions(-) delete mode 100644 data/tariffplans/testit/ActionProfiles.csv delete mode 100644 data/tariffplans/tutactions/ActionProfiles.csv diff --git a/agents/astagent.go b/agents/astagent.go index 1ee53acad..bdd4763e7 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -147,7 +147,6 @@ func (sma *AsteriskAgent) hangupChannel(channelID, warnMsg string) { fmt.Sprintf("<%s> failed disconnecting channel <%s>, err: %s", utils.AsteriskAgent, channelID, err.Error())) } - return } func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { diff --git a/agents/dnsagent.go b/agents/dnsagent.go index f2d8a0b00..1aa9069b6 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -167,7 +167,6 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { rply.Rcode = dns.RcodeServerFailure dnsWriteMsg(w, rply) } - return } func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor, diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 483f28af6..ee6ee6515 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -144,7 +144,7 @@ func startRPC(server *cores.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, - internalEEsChan, internalRateSChan, internalActionSChan, + internalEEsChan, internalRateSChan, internalAccountSChan chan rpcclient.ClientConnector, shdChan *utils.SyncedChan) { if !cfg.DispatcherSCfg().Enabled { @@ -179,8 +179,6 @@ func startRPC(server *cores.Server, internalRaterChan, internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS - case actionS := <-internalActionSChan: - internalActionSChan <- actionS case accountS := <-internalAccountSChan: internalAccountSChan <- accountS case <-shdChan.Done(): @@ -319,8 +317,8 @@ func stopCPUProfiling(f io.Closer) { } func singnalHandler(shdWg *sync.WaitGroup, shdChan *utils.SyncedChan) { - shutdownSignal := make(chan os.Signal) - reloadSignal := make(chan os.Signal) + shutdownSignal := make(chan os.Signal, 1) + reloadSignal := make(chan os.Signal, 1) signal.Notify(shutdownSignal, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) signal.Notify(reloadSignal, syscall.SIGHUP) @@ -499,7 +497,6 @@ func main() { internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) internalEEsChan := make(chan rpcclient.ClientConnector, 1) internalRateSChan := make(chan rpcclient.ClientConnector, 1) - internalActionSChan := make(chan rpcclient.ClientConnector, 1) internalAccountSChan := make(chan rpcclient.ClientConnector, 1) // initialize the connManager before creating the DMService @@ -526,7 +523,6 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, - utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): internalActionSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): internalAccountSChan, @@ -565,7 +561,6 @@ func main() { utils.StatS: new(sync.WaitGroup), utils.StorDB: new(sync.WaitGroup), utils.ThresholdS: new(sync.WaitGroup), - utils.ActionS: new(sync.WaitGroup), utils.AccountS: new(sync.WaitGroup), } gvService := services.NewGlobalVarS(cfg, srvDep) @@ -710,7 +705,6 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan) engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan) - engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan) engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan) engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) engine.IntRPC.AddInternalRPCClient(utils.AccountSv1, internalAccountSChan) @@ -727,7 +721,7 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, + internalCacheSChan, internalEEsChan, internalRateSChan, internalAccountSChan, shdChan) <-shdChan.Done() diff --git a/console/command.go b/console/command.go index 7907aa527..3b6deca66 100644 --- a/console/command.go +++ b/console/command.go @@ -49,7 +49,7 @@ func GetCommands() map[string]Commander { func getAvailabelCommandsErr() error { var keys []string - for key, _ := range commands { + for key := range commands { keys = append(keys, key) } return fmt.Errorf("\n\tAvailable commands <%s>\n", strings.Join(keys, "|")) diff --git a/console/ping.go b/console/ping.go index 5069f4045..79138a483 100644 --- a/console/ping.go +++ b/console/ping.go @@ -85,8 +85,6 @@ func (self *CmdApierPing) RpcMethod() string { return utils.RateSv1Ping case utils.AccountSLow: return utils.AccountSv1Ping - case utils.ActionSLow: - return utils.ActionSv1Ping default: } return self.rpcMethod diff --git a/data/tariffplans/testit/ActionProfiles.csv b/data/tariffplans/testit/ActionProfiles.csv deleted file mode 100644 index 73af7c414..000000000 --- a/data/tariffplans/testit/ActionProfiles.csv +++ /dev/null @@ -1,8 +0,0 @@ -#Tenant,ID,FilterIDs,ActivationInterval,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue -cgrates.org,ONE_TIME_ACT,,,10,*asap,*accounts,1001;1002,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10 -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_DATA,,false,0s,*set_balance,,*balance.TestDataBalance.Type,*data -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_TEST_DATA,,false,0s,*add_balance,,*balance.TestDataBalance.Units,1024 -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_VOICE,,false,0s,*set_balance,,*balance.TestVoiceBalance.Type,*voice -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_TEST_VOICE,,false,0s,*add_balance,,*balance.TestVoiceBalance.Units,15m15s -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_FILTERS,,false,0s,*set_balance,,*balance.TestVoiceBalance.Filters,*string:~*req.CustomField:500 -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_REM_VOICE,,false,0s,*rem_balance,,TestVoiceBalance2, \ No newline at end of file diff --git a/data/tariffplans/tutactions/ActionProfiles.csv b/data/tariffplans/tutactions/ActionProfiles.csv deleted file mode 100644 index ddf8906f9..000000000 --- a/data/tariffplans/tutactions/ActionProfiles.csv +++ /dev/null @@ -1,8 +0,0 @@ -#Tenant,ID,FilterIDs,ActivationInterval,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue -cgrates.org,ONE_TIME_ACT,,,10,*asap,*accounts,1001;1002,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10 -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_DATA,,false,0s,*set_balance,,*balance.TestDataBalance.Type,*data -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_TEST_DATA,,false,0s,*add_balance,,*balance.TestDataBalance.Units,1024 -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_VOICE,,false,0s,*set_balance,,*balance.TestVoiceBalance.Type,*voice -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_TEST_VOICE,,false,0s,*add_balance,,*balance.TestVoiceBalance.Units,15m15s -cgrates.org,ONE_TIME_ACT,,,,,,,SET_BALANCE_TEST_FILTERS,,false,0s,*set_balance,,*balance.TestVoiceBalance.Filters,*string:~*req.CustomField:500 -cgrates.org,ONE_TIME_ACT,,,,,,,TOPUP_REM_VOICE,,false,0s,*rem_balance,,TestVoiceBalance2, diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go index 44317e270..62f388f4b 100644 --- a/dispatchers/libdispatcher.go +++ b/dispatchers/libdispatcher.go @@ -125,7 +125,6 @@ func (wd *WeightDispatcher) SetProfile(pfl *engine.DispatcherProfile) { pfl.Hosts.Sort() wd.hosts = pfl.Hosts.Clone() // avoid concurrency on profile wd.Unlock() - return } // HostIDs used to implement Dispatcher interface @@ -158,7 +157,6 @@ func (d *RandomDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() d.hosts = pfl.Hosts.Clone() d.Unlock() - return } // HostIDs used to implement Dispatcher interface @@ -192,7 +190,6 @@ func (d *RoundRobinDispatcher) SetProfile(pfl *engine.DispatcherProfile) { d.Lock() d.hosts = pfl.Hosts.Clone() d.Unlock() - return } // HostIDs used to implement Dispatcher interface diff --git a/ees/ees.go b/ees/ees.go index df1390bd0..6b90874fa 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -125,7 +125,7 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []st eeS.cfg.EEsNoLksCfg().AttributeSConns, nil, utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { - cgrEv = rplyEv.CGREvent + *cgrEv = *rplyEv.CGREvent } else if err != nil && err.Error() == utils.ErrNotFound.Error() { err = nil // cancel ErrNotFound diff --git a/ees/ees_test.go b/ees/ees_test.go index 0de421e3b..191055ef9 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -102,6 +102,7 @@ func TestAttrSProcessEvent(t *testing.T) { utils.AttributeSv1ProcessEvent: func(args, reply interface{}) error { rplyEv := &engine.AttrSProcessEventReply{ AlteredFields: []string{"testcase"}, + CGREvent: &utils.CGREvent{Event: map[string]interface{}{"testcase": 1}}, } *reply.(*engine.AttrSProcessEventReply) = *rplyEv return nil @@ -125,8 +126,11 @@ func TestAttrSProcessEvent(t *testing.T) { }) eeS := NewEventExporterS(cgrCfg, filterS, connMgr) // cgrEv := &utils.CGREvent{} + exp := &utils.CGREvent{Event: map[string]interface{}{"testcase": 1}} if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { t.Error(err) + } else if !reflect.DeepEqual(exp, cgrEv) { + t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(cgrEv)) } } diff --git a/ees/elastic.go b/ees/elastic.go index 7c5897c63..e43b28b41 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -118,9 +118,7 @@ func (eEe *ElasticEe) ID() string { } // OnEvicted implements EventExporter, doing the cleanup before exit -func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) { - return -} +func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {} // ExportEvent implements EventExporter func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { diff --git a/ees/filecsv.go b/ees/filecsv.go index d4fff06a4..0e1bd5205 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -88,7 +88,6 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EventExporterS, fCsv.id, err.Error())) } - return } // ExportEvent implements EventExporter diff --git a/ees/filefwv.go b/ees/filefwv.go index ffc5945f1..cd989b64f 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -78,7 +78,6 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EventExporterS, fFwv.id, err.Error())) } - return } // ExportEvent implements EventExporter diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 99970bfce..cca819ff1 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -63,9 +63,7 @@ func (httpEE *HTTPjsonMapEE) ID() string { } // OnEvicted implements EventExporter, doing the cleanup before exit -func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) { - return -} +func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {} // ExportEvent implements EventExporter func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { diff --git a/ees/httppost.go b/ees/httppost.go index b411950a1..96069e0dd 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -57,9 +57,7 @@ func (httpPost *HTTPPost) ID() string { } // OnEvicted implements EventExporter, doing the cleanup before exit -func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) { - return -} +func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {} // ExportEvent implements EventExporter func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { diff --git a/ees/posterjsonmap.go b/ees/posterjsonmap.go index 28be1d499..54dada8f1 100644 --- a/ees/posterjsonmap.go +++ b/ees/posterjsonmap.go @@ -76,7 +76,6 @@ func (pstrEE *PosterJSONMapEE) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) { pstrEE.poster.Close() - return } // ExportEvent implements EventExporter diff --git a/ees/virtualee.go b/ees/virtualee.go index 8e8f6168e..a1d4c5d3d 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -55,9 +55,7 @@ func (vEe *VirtualEe) ID() string { } // OnEvicted implements EventExporter, doing the cleanup before exit -func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) { - return -} +func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {} // ExportEvent implements EventExporter func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 9510585f6..0577bc159 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -101,11 +101,9 @@ func (rdr *AMQPv1ER) Serve() (err error) { return } go func() { - select { - case <-rdr.rdrExit: - receiver.Close(context.Background()) - rdr.close() - } + <-rdr.rdrExit + receiver.Close(context.Background()) + rdr.close() }() go rdr.readLoop(receiver) // read until the connection is closed diff --git a/ers/filecsv.go b/ers/filecsv.go index 61a024310..f3300125b 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -201,6 +201,6 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } diff --git a/ers/filefwv.go b/ers/filefwv.go index 115c4c957..c699b8c6c 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -236,7 +236,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } diff --git a/ers/filejson.go b/ers/filejson.go index c68463d1a..8cc86d8c2 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -183,6 +183,6 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Events posted: %d, run duration: %s", - utils.ERs, absPath, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, evsPosted, time.Since(timeStart))) return } diff --git a/ers/filexml.go b/ers/filexml.go index a05420e9d..70666d085 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -181,6 +181,6 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } diff --git a/ers/flatstore.go b/ers/flatstore.go index 8c5d53f31..663be0317 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -231,7 +231,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } diff --git a/ers/kafka.go b/ers/kafka.go index 0e5ed13b8..fe066771c 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -99,17 +99,14 @@ func (rdr *KafkaER) Serve() (err error) { } go func(r *kafka.Reader) { // use a secondary gorutine because the ReadMessage is blocking function - select { - case <-rdr.rdrExit: - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring kafka path <%s>", - utils.ERs, rdr.dialURL)) - if rdr.poster != nil { - rdr.poster.Close() - } - r.Close() // already locked in library - return + <-rdr.rdrExit + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring kafka path <%s>", + utils.ERs, rdr.dialURL)) + if rdr.poster != nil { + rdr.poster.Close() } + r.Close() // already locked in library }(r) go rdr.readLoop(r) // read until the connection is closed return diff --git a/ers/partial_csv.go b/ers/partial_csv.go index a40dd4121..70c693d8e 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -283,7 +283,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { utils.Logger.Info( fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) return } diff --git a/ers/s3.go b/ers/s3.go index db95aa690..f3fe6a5e9 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -54,6 +54,7 @@ func NewS3ER(cfg *config.CGRConfig, cfgIdx int, } } rdr.parseOpts(rdr.Config().Opts) + rdr.createPoster() return rdr, nil } diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index bd9b6d3d6..c4290d7cb 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -183,6 +183,7 @@ func TestNewS3ER(t *testing.T) { if err != nil { t.Fatal(err) } + rdr.(*S3ER).poster = nil if !reflect.DeepEqual(rdr, expected) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, rdr) } @@ -227,6 +228,7 @@ func TestNewS3ERCase2(t *testing.T) { if err != nil { t.Fatal(err) } + rdr.(*S3ER).poster = nil expected.cap = rdr.(*S3ER).cap if !reflect.DeepEqual(rdr, expected) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, rdr) diff --git a/ers/sqs.go b/ers/sqs.go index 5c80f6b8f..98e10f9e3 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -54,6 +54,7 @@ func NewSQSER(cfg *config.CGRConfig, cfgIdx int, } } rdr.parseOpts(rdr.Config().Opts) + rdr.createPoster() return rdr, nil } diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go index 765edd648..3b67481d6 100644 --- a/ers/sqs_it_test.go +++ b/ers/sqs_it_test.go @@ -160,6 +160,7 @@ func TestNewSQSER(t *testing.T) { } expected.cap = rdr.(*SQSER).cap expected.session = rdr.(*SQSER).session + rdr.(*SQSER).poster = nil if !reflect.DeepEqual(rdr, expected) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, rdr) } diff --git a/loaders/loader.go b/loaders/loader.go index b1f6a2134..3e9372e37 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -97,7 +97,6 @@ type Loader struct { dataTpls map[string][]*config.FCTemplate // map[loaderType]*config.FCTemplate flagsTpls map[string]utils.FlagsWithParams //map[loaderType]utils.FlagsWithParams rdrs map[string]map[string]*openedCSVFile // map[loaderType]map[fileName]*openedCSVFile for common incremental read - procRows int // keep here the last processed row in the file/-s bufLoaderData map[string][]LoaderData // cache of data read, indexed on tenantID dm *engine.DataManager timezone string diff --git a/migrator/accounts.go b/migrator/accounts.go index b596f6fb1..57425db4b 100644 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -232,13 +232,12 @@ type v1UnitsCounter struct { } type v2Account struct { - ID string - BalanceMap map[string]engine.Balances - UnitCounters engine.UnitCounters - ActionTriggers engine.ActionTriggers - AllowNegative bool - Disabled bool - executingTriggers bool + ID string + BalanceMap map[string]engine.Balances + UnitCounters engine.UnitCounters + ActionTriggers engine.ActionTriggers + AllowNegative bool + Disabled bool } func (b *v1Balance) IsDefault() bool { @@ -248,7 +247,7 @@ func (b *v1Balance) IsDefault() bool { b.ExpirationDate.IsZero() && b.SharedGroup == "" && b.Weight == 0 && - b.Disabled == false + !b.Disabled } func (v1Acc v1Account) V1toV3Account() (ac *engine.Account) { @@ -331,7 +330,7 @@ func (v1Acc v1Account) V1toV3Account() (ac *engine.Account) { if oldUcBal.TimingIDs != "" { bf.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(oldUcBal.TimingIDs)) } - if oldUcBal.Disabled != false { + if oldUcBal.Disabled { bf.Disabled = utils.BoolPointer(oldUcBal.Disabled) } bf.Timings = oldUcBal.Timings diff --git a/migrator/action.go b/migrator/action.go index e9827c953..2695c0284 100644 --- a/migrator/action.go +++ b/migrator/action.go @@ -62,21 +62,6 @@ func (m *Migrator) migrateCurrentActions() (err error) { return } -func (m *Migrator) removeV1Actions() (err error) { - var v1 *v1Actions - for { - if v1, err = m.dmIN.getV1Actions(); err != nil && err != utils.ErrNoMoreData { - return err - } - if v1 == nil { - return nil - } - if err = m.dmIN.remV1Actions(*v1); err != nil { - return err - } - } -} - func (m *Migrator) migrateV1Actions() (acts engine.Actions, err error) { var v1ACs *v1Actions @@ -191,7 +176,7 @@ func (v1Act v1Action) AsAction() (act *engine.Action) { if v1Act.Balance.Weight != 0 { bf.Weight = utils.Float64Pointer(v1Act.Balance.Weight) } - if v1Act.Balance.Disabled != false { + if v1Act.Balance.Disabled { bf.Disabled = utils.BoolPointer(v1Act.Balance.Disabled) } if !v1Act.Balance.ExpirationDate.IsZero() { diff --git a/migrator/action_plan.go b/migrator/action_plan.go index 3e4b22bf2..da652309c 100644 --- a/migrator/action_plan.go +++ b/migrator/action_plan.go @@ -21,7 +21,6 @@ package migrator import ( "fmt" "strings" - "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -34,8 +33,6 @@ type v1ActionPlan struct { Timing *engine.RateInterval Weight float64 ActionsId string - actions v1Actions - stCache time.Time // cached time of the next start } type v1ActionPlans []*v1ActionPlan diff --git a/migrator/action_trigger.go b/migrator/action_trigger.go index cad32ebea..6a3f9452f 100644 --- a/migrator/action_trigger.go +++ b/migrator/action_trigger.go @@ -47,7 +47,6 @@ type v1ActionTrigger struct { ActionsId string MinQueuedItems int // Trigger actions only if this number is hit (stats only) Executed bool - lastExecutionTime time.Time } type v1ActionTriggers []*v1ActionTrigger diff --git a/migrator/attributes.go b/migrator/attributes.go index 3bc633c48..4e381693a 100644 --- a/migrator/attributes.go +++ b/migrator/attributes.go @@ -75,26 +75,6 @@ func (m *Migrator) migrateCurrentAttributeProfile() (err error) { return } -// migrateV1ToV2Attributes migrates attributeProfile from v1 to v2 -// for the moment the system in using the shortcut from v1 to v4 -func (m *Migrator) migrateV1ToV2Attributes() (v2Attr *v2AttributeProfile, err error) { - var v1Attr *v1AttributeProfile - - v1Attr, err = m.dmIN.getV1AttributeProfile() - if err != nil { - return nil, err - } else if v1Attr == nil { - return nil, errors.New("Attribute NIL") - } - - v2Attr, err = v1Attr.AsAttributeProfile() - if err != nil { - return nil, err - } - - return -} - func (m *Migrator) migrateV1ToV4AttributeProfile() (v4Attr *v4AttributeProfile, err error) { var v1Attr *v1AttributeProfile v1Attr, err = m.dmIN.getV1AttributeProfile() @@ -306,7 +286,7 @@ func (v2AttrPrf v2AttributeProfile) AsAttributeProfile() (attrPrf *v3AttributePr for _, attr := range v2AttrPrf.Attributes { filterIDs := make([]string, 0) //append false translate to if FieldName exist do stuff - if attr.Append == false { + if !attr.Append { filterIDs = append(filterIDs, utils.MetaExists+utils.InInFieldSep+attr.FieldName+utils.InInFieldSep) } //Initial not *any translate to if value of fieldName = initial do stuff @@ -339,7 +319,7 @@ func (v1AttrPrf v1AttributeProfile) AsAttributeProfileV1To4() (attrPrf *v4Attrib // Create FilterIDs []string filterIDs := make([]string, 0) //append false translate to if FieldName exist do stuff - if attr.Append == false { + if !attr.Append { filterIDs = append(filterIDs, utils.MetaExists+utils.ConcatenatedKeySep+attr.FieldName+utils.ConcatenatedKeySep) } //Initial not *any translate to if value of fieldName = initial do stuff diff --git a/migrator/cdrs.go b/migrator/cdrs.go index 9fe05fdc4..655eafb2c 100644 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -95,20 +95,6 @@ func (m *Migrator) migrateCDRs() (err error) { return m.ensureIndexesStorDB(engine.ColCDRs) } -func (m *Migrator) removeV1CDRs() (err error) { - var v1CDR *v1Cdrs - if v1CDR, err = m.storDBIn.getV1CDR(); err != nil { - return err - } - if v1CDR == nil { - return - } - if err = m.storDBIn.remV1CDRs(v1CDR); err != nil { - return - } - return -} - func (m *Migrator) migrateV1CDRs() (cdr *engine.CDR, err error) { var v1CDR *v1Cdrs if v1CDR, err = m.storDBIn.getV1CDR(); err != nil { diff --git a/migrator/derived_chargers.go b/migrator/derived_chargers.go index 96e7ef107..9c028e4a2 100644 --- a/migrator/derived_chargers.go +++ b/migrator/derived_chargers.go @@ -74,9 +74,7 @@ func fieldinfo2Attribute(attr []*engine.Attribute, fieldName, fieldInfo string) if fieldInfo == utils.MetaDefault || len(fieldInfo) == 0 { return attr } - if strings.HasPrefix(fieldInfo, utils.StaticValuePrefix) { - fieldInfo = fieldInfo[1:] - } + fieldInfo = strings.TrimPrefix(fieldInfo, utils.StaticValuePrefix) var err error if rp, err = config.NewRSRParsers(fieldInfo, utils.InfieldSep); err != nil { utils.Logger.Err(fmt.Sprintf("On Migrating rule: <%s>, error: %s", fieldInfo, err.Error())) @@ -136,12 +134,8 @@ func derivedChargers2Charger(dc *v1DerivedCharger, tenant string, key string, fi filter := dc.RunFilters if len(filter) != 0 { - if strings.HasPrefix(filter, utils.StaticValuePrefix) { - filter = filter[1:] - } - if strings.HasPrefix(filter, utils.DynamicDataPrefix) { - filter = filter[1:] - } + filter = strings.TrimPrefix(filter, utils.StaticValuePrefix) + filter = strings.TrimPrefix(filter, utils.DynamicDataPrefix) flt, err := migrateInlineFilterV4([]string{"*rsr::" + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + filter}) if err != nil { return diff --git a/migrator/filters.go b/migrator/filters.go index b4079abda..09c41b813 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -23,7 +23,6 @@ import ( "regexp" "strings" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -115,10 +114,7 @@ func migrateFilterV2(fl *v1Filter) (fltr *engine.Filter) { fltr.Rules[i].Element = utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + rule.FieldName } else { for idx, val := range rule.Values { - if strings.HasPrefix(val, utils.DynamicDataPrefix) { - // remove dynamic data prefix from fieldName - val = val[1:] - } + val = strings.TrimPrefix(val, utils.DynamicDataPrefix) // remove dynamic data prefix from fieldName fltr.Rules[i].Values[idx] = utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + val } } @@ -803,11 +799,9 @@ type v1Filter struct { } type v1FilterRule struct { - Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte) - FieldName string // Name of the field providing us the Values to check (used in case of some ) - Values []string // Filter definition - rsrFields config.RSRParsers // Cache here the RSRFilter Values - negative *bool + Type string // Filter type (*string, *timing, *rsr_filters, *stats, *lt, *lte, *gt, *gte) + FieldName string // Name of the field providing us the Values to check (used in case of some ) + Values []string // Filter definition } func (m *Migrator) migrateRequestFilterV4(v4Fltr *engine.Filter) (fltr *engine.Filter, err error) { diff --git a/migrator/migrator_utils.go b/migrator/migrator_utils.go index 1281718af..a9acfe71e 100644 --- a/migrator/migrator_utils.go +++ b/migrator/migrator_utils.go @@ -34,56 +34,48 @@ var ( func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string, cacheCfg *config.CacheCfg, opts map[string]interface{}) (db MigratorDataDB, err error) { - dbCon, err := engine.NewDataDBConn(db_type, host, - port, name, user, pass, marshaler, opts) - if err != nil { - return nil, err + var dbCon engine.DataDB + if dbCon, err = engine.NewDataDBConn(db_type, host, + port, name, user, pass, marshaler, opts); err != nil { + return } dm := engine.NewDataManager(dbCon, cacheCfg, nil) - var d MigratorDataDB switch db_type { case utils.Redis: - d = newRedisMigrator(dm) + db = newRedisMigrator(dm) case utils.Mongo: - d = newMongoMigrator(dm) - db = d.(MigratorDataDB) + db = newMongoMigrator(dm) case utils.INTERNAL: - d = newInternalMigrator(dm) - db = d.(MigratorDataDB) + db = newInternalMigrator(dm) default: err = fmt.Errorf("unknown db '%s' valid options are '%s' or '%s or '%s'", db_type, utils.Redis, utils.Mongo, utils.INTERNAL) } - return d, nil + return } func NewMigratorStorDB(db_type, host, port, name, user, pass, marshaler string, stringIndexedFields, prefixIndexedFields []string, opts map[string]interface{}) (db MigratorStorDB, err error) { - var d MigratorStorDB - storDb, err := engine.NewStorDBConn(db_type, host, port, name, user, - pass, marshaler, stringIndexedFields, prefixIndexedFields, opts) - if err != nil { - return nil, err + var storDb engine.StorDB + if storDb, err = engine.NewStorDBConn(db_type, host, port, name, user, + pass, marshaler, stringIndexedFields, prefixIndexedFields, opts); err != nil { + return } switch db_type { case utils.Mongo: - d = newMongoStorDBMigrator(storDb) - db = d.(MigratorStorDB) + db = newMongoStorDBMigrator(storDb) case utils.MySQL: - d = newMigratorSQL(storDb) - db = d.(MigratorStorDB) + db = newMigratorSQL(storDb) case utils.Postgres: - d = newMigratorSQL(storDb) - db = d.(MigratorStorDB) + db = newMigratorSQL(storDb) case utils.INTERNAL: - d = newInternalStorDBMigrator(storDb) - db = d.(MigratorStorDB) + db = newInternalStorDBMigrator(storDb) default: err = fmt.Errorf("Unknown db '%s' valid options are [%s, %s, %s, %s]", db_type, utils.MySQL, utils.Mongo, utils.Postgres, utils.INTERNAL) } - return d, nil + return } func (m *Migrator) getVersions(str string) (vrs engine.Versions, err error) { if str == utils.CDRs || str == utils.SessionSCosts || strings.HasPrefix(str, "Tp") { diff --git a/migrator/stats.go b/migrator/stats.go index bb476213c..032a4131c 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -58,8 +58,6 @@ type v1Stat struct { Triggers engine.ActionTriggers } -type v1Stats []*v1Stat - func (m *Migrator) moveStatQueueProfile() (err error) { //StatQueueProfile var ids []string diff --git a/migrator/storage_map_datadb.go b/migrator/storage_map_datadb.go index 0a55ac0ff..6ca4489e9 100644 --- a/migrator/storage_map_datadb.go +++ b/migrator/storage_map_datadb.go @@ -24,10 +24,8 @@ import ( ) type internalMigrator struct { - dm *engine.DataManager - iDB *engine.InternalDB - dataKeys []string - qryIdx *int + dm *engine.DataManager + iDB *engine.InternalDB } func newInternalMigrator(dm *engine.DataManager) (iDBMig *internalMigrator) { diff --git a/migrator/storage_map_stordb.go b/migrator/storage_map_stordb.go index 55ab681be..772fac967 100644 --- a/migrator/storage_map_stordb.go +++ b/migrator/storage_map_stordb.go @@ -31,10 +31,8 @@ func newInternalStorDBMigrator(stor engine.StorDB) (iDBMig *internalStorDBMigrat } type internalStorDBMigrator struct { - storDB *engine.StorDB - iDB *engine.InternalDB - dataKeys []string - qryIdx *int + storDB *engine.StorDB + iDB *engine.InternalDB } func (iDBMig *internalStorDBMigrator) close() {} diff --git a/migrator/storage_sql.go b/migrator/storage_sql.go index d83cca2df..8de827346 100644 --- a/migrator/storage_sql.go +++ b/migrator/storage_sql.go @@ -20,7 +20,6 @@ package migrator import ( "database/sql" - "fmt" "time" "github.com/cgrates/cgrates/engine" @@ -108,7 +107,7 @@ func (mgSQL *migratorSQL) renameV1SMCosts() (err error) { } func (mgSQL *migratorSQL) createV1SMCosts() (err error) { - qry := fmt.Sprint("CREATE TABLE sm_costs ( id int(11) NOT NULL AUTO_INCREMENT, cgrid varchar(40) NOT NULL, run_id varchar(64) NOT NULL, origin_host varchar(64) NOT NULL, origin_id varchar(128) NOT NULL, cost_source varchar(64) NOT NULL, `usage` BIGINT NOT NULL, cost_details MEDIUMTEXT, created_at TIMESTAMP NULL,deleted_at TIMESTAMP NULL, PRIMARY KEY (`id`),UNIQUE KEY costid (cgrid, run_id),KEY origin_idx (origin_host, origin_id),KEY run_origin_idx (run_id, origin_id),KEY deleted_at_idx (deleted_at));") + qry := "CREATE TABLE sm_costs ( id int(11) NOT NULL AUTO_INCREMENT, cgrid varchar(40) NOT NULL, run_id varchar(64) NOT NULL, origin_host varchar(64) NOT NULL, origin_id varchar(128) NOT NULL, cost_source varchar(64) NOT NULL, `usage` BIGINT NOT NULL, cost_details MEDIUMTEXT, created_at TIMESTAMP NULL,deleted_at TIMESTAMP NULL, PRIMARY KEY (`id`),UNIQUE KEY costid (cgrid, run_id),KEY origin_idx (origin_host, origin_id),KEY run_origin_idx (run_id, origin_id),KEY deleted_at_idx (deleted_at));" if mgSQL.StorDB().GetStorageType() == utils.Postgres { qry = ` CREATE TABLE sm_costs ( diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 6dbeaaa49..882b9031e 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -45,8 +45,6 @@ type v2ActionTrigger struct { LastExecutionTime time.Time } -type v2ActionTriggers []*v2ActionTrigger - func (m *Migrator) migrateCurrentThresholds() (err error) { var ids []string //Thresholds diff --git a/migrator/tp_shared_groups.go b/migrator/tp_shared_groups.go index f840316fe..2a8414d4e 100644 --- a/migrator/tp_shared_groups.go +++ b/migrator/tp_shared_groups.go @@ -41,7 +41,7 @@ func (m *Migrator) migrateCurrentTPsharedgroups() (err error) { return err } if sharedGroup != nil { - if m.dryRun != true { + if !m.dryRun { if err := m.storDBOut.StorDB().SetTPSharedGroups(sharedGroup); err != nil { return err } diff --git a/migrator/tp_timings.go b/migrator/tp_timings.go index fd3756fdd..eb17f8bc7 100644 --- a/migrator/tp_timings.go +++ b/migrator/tp_timings.go @@ -41,7 +41,7 @@ func (m *Migrator) migrateCurrentTPTiming() (err error) { return err } if tm != nil { - if m.dryRun != true { + if !m.dryRun { if err := m.storDBOut.StorDB().SetTPTimings(tm); err != nil { return err } diff --git a/registrarc/registrarc.go b/registrarc/registrarc.go index 6fd6a49db..1902201ee 100644 --- a/registrarc/registrarc.go +++ b/registrarc/registrarc.go @@ -122,7 +122,6 @@ func (dhS *RegistrarCService) registerDispHosts() { } } } - return } func (dhS *RegistrarCService) registerRPCHosts() { @@ -143,7 +142,6 @@ func (dhS *RegistrarCService) registerRPCHosts() { } } } - return } func unregisterHosts(connMgr *engine.ConnManager, regCfg *config.RegistrarCCfg, dTnt, method string) { diff --git a/sessions/sessions.go b/sessions/sessions.go index 6e807ae0c..823969cd5 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -106,7 +106,6 @@ func (sS *SessionS) ListenAndServe(stopChan chan struct{}) { } } } - return } // Shutdown is called by engine to clear states @@ -807,7 +806,6 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, connIDs []string) utils.SessionS, sCln.CGRID, err.Error())) } } - return } // registerSession will register an active or passive Session @@ -899,7 +897,6 @@ func (sS *SessionS) indexSession(s *Session, pSessions bool) { ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{fieldName: fieldName, fieldValue: fieldVal}) } } - return } // unindexASession removes an active or passive session from indexes @@ -938,9 +935,6 @@ func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) ( f, err := sS.dm.GetFilter(tenant, fltrID, true, true, utils.NonTransactional) if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefixNotFound(fltrID) - } continue } if f.ActivationInterval != nil && @@ -1848,7 +1842,6 @@ func (args *V1AuthorizeArgs) ParseFlags(flags, sep string) { } } args.Paginator, _ = utils.GetRoutePaginatorFromOpts(args.APIOpts) - return } // V1AuthorizeReply are options available in auth reply @@ -2817,7 +2810,6 @@ func (args *V1ProcessMessageArgs) ParseFlags(flags, sep string) { } } args.Paginator, _ = utils.GetRoutePaginatorFromOpts(args.APIOpts) - return } // V1ProcessMessageReply is the reply for the ProcessMessage API diff --git a/structmatcher/structmatcher.go b/structmatcher/structmatcher.go index 141943e96..8e7655607 100644 --- a/structmatcher/structmatcher.go +++ b/structmatcher/structmatcher.go @@ -267,10 +267,6 @@ func isOperator(s string) bool { return strings.HasPrefix(s, "*") } -func notEmpty(x interface{}) bool { - return !reflect.DeepEqual(x, reflect.Zero(reflect.TypeOf(x)).Interface()) -} - type StructMatcher struct { rootElement element } diff --git a/utils/consts.go b/utils/consts.go index e50851d33..673276699 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2679,14 +2679,6 @@ var ( } ) -// ActionSv1 -const ( - ActionSv1 = "ActionSv1" - ActionSv1Ping = "ActionSv1.Ping" - ActionSv1ScheduleActions = "ActionSv1.ScheduleActions" - ActionSv1ExecuteActions = "ActionSv1.ExecuteActions" -) - // Time duration suffix const ( NsSuffix = "ns"