diff --git a/general_tests/cdrs_onlexp_it_test.go b/general_tests/cdrs_onlexp_it_test.go index 8c9cc4c67..cb62bfe9f 100644 --- a/general_tests/cdrs_onlexp_it_test.go +++ b/general_tests/cdrs_onlexp_it_test.go @@ -25,11 +25,14 @@ import ( "context" "encoding/json" "fmt" + "net" "net/url" "os" "os/exec" "path" "reflect" + "sort" + "strconv" "testing" "time" @@ -46,8 +49,8 @@ var ( cdrsMasterCfgDIR, cdrsSlaveCfgDIR string cdrsMasterCfg, cdrsSlaveCfg *config.CGRConfig cdrsMasterRpc *rpcclient.RPCClient - httpCGRID = utils.UUIDSha1Prefix() - amqpCGRID = utils.UUIDSha1Prefix() + httpCGRID = "1234abcd" + amqpCGRID = "5678wxyz" failoverContent = []interface{}{[]byte(fmt.Sprintf(`{"CGRID":"%s"}`, httpCGRID)), []byte(fmt.Sprintf(`{"CGRID":"%s"}`, amqpCGRID))} sTestsCDRsOnExp = []func(t *testing.T){ @@ -56,6 +59,7 @@ var ( testCDRsOnExpStartMasterEngine, testCDRsOnExpStartSlaveEngine, testCDRsOnExpAMQPQueuesCreation, + testCDRsOnExpKafkaPosterCreateTopic, testCDRsOnExpInitMasterRPC, testCDRsOnExpLoadDefaultCharger, testCDRsOnExpDisableOnlineExport, @@ -63,6 +67,7 @@ var ( testCDRsOnExpAMQPReplication, testCDRsOnExpFileFailover, testCDRsOnExpKafkaPosterFileFailover, + testCDRsOnExpKafkaPosterDeleteTopic, testCDRsOnExpStopEngine, } ) @@ -346,7 +351,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { if rcvCDR[utils.CGRID] != httpCGRID { t.Errorf("Unexpected CDR received: %+v", rcvCDR) } - case <-time.After(time.Duration(100 * time.Millisecond)): + case <-time.After(time.Duration(2 * time.Second)): t.Error("No message received from RabbitMQ") } if msgs, err = ch.Consume(q1.Name, "consumer", true, false, false, false, nil); err != nil { @@ -362,7 +367,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { if rcvCDR[utils.CGRID] != httpCGRID { t.Errorf("Unexpected CDR received: %+v", rcvCDR) } - case <-time.After(time.Duration(100 * time.Millisecond)): + case <-time.After(time.Duration(2 * time.Second)): t.Error("No message received from RabbitMQ") } conn.Close() @@ -428,7 +433,7 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { if rcvCDR[utils.CGRID] != testCdr.CGRID { t.Errorf("Unexpected CDR received: %+v", rcvCDR) } - case <-time.After(150 * time.Millisecond): + case <-time.After(2 * time.Second): t.Error("No message received from RabbitMQ") } @@ -444,10 +449,18 @@ func testCDRsOnExpAMQPReplication(t *testing.T) { if rcvCDR[utils.CGRID] != testCdr.CGRID { t.Errorf("Unexpected CDR received: %s expeced: %s", utils.ToJSON(rcvCDR), utils.ToJSON(testCdr)) } - case <-time.After(150 * time.Millisecond): + case <-time.After(2 * time.Second): t.Error("No message received from RabbitMQ") } + // Delete both queues once we are done verifying the exports + if _, err := ch.QueueDelete(q.Name, false, false, false); err != nil { + t.Errorf("Failed to delete queue named %s, err: <%s>", q.Name, err.Error()) + } + if _, err := ch.QueueDelete(q1.Name, false, false, false); err != nil { + t.Errorf("Failed to delete queue named %s, err: <%s>", q1.Name, err.Error()) + } + } func checkContent(ev *engine.ExportEvents, content []interface{}) error { @@ -517,25 +530,96 @@ func testCDRsOnExpFileFailover(t *testing.T) { } } +func testCDRsOnExpKafkaPosterCreateTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + controller, err := conn.Controller() + if err != nil { + t.Fatal(err) + } + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if err != nil { + t.Fatal(err) + } + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: "cgrates_cdrs", + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + err = controllerConn.CreateTopics(topicConfigs...) + if err != nil { + t.Fatal(err) + } +} + func testCDRsOnExpKafkaPosterFileFailover(t *testing.T) { reader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: "cgrates_cdrs", - GroupID: "tmp", - MaxWait: time.Millisecond, + Brokers: []string{"localhost:9092"}, + Topic: "cgrates_cdrs", + Partition: 0, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB }) - defer reader.Close() - - for i := 0; i < 2; i++ { // no raw CDR - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if m, err := reader.ReadMessage(ctx); err != nil { - t.Fatal(err) - } else if !reflect.DeepEqual(failoverContent[0].([]byte), m.Value) && !reflect.DeepEqual(failoverContent[1].([]byte), m.Value) { // Checking just the prefix should do since some content is dynamic - t.Errorf("Expecting: %v or %v, received: %v", utils.IfaceAsString(failoverContent[0]), utils.IfaceAsString(failoverContent[1]), string(m.Value)) - } - cancel() + expCDRs := []string{ + string(failoverContent[0].([]byte)), + string(failoverContent[1].([]byte)), } + rcvCDRs := make([]string, 2) + for i := 0; i < 2; i++ { // no raw CDR + m, err := reader.ReadMessage(context.Background()) + if err != nil { + t.Errorf("Failed to read message nr. %d: %s", i, err.Error()) + break + } + rcvCDRs[i] = string(m.Value) + } + + sort.Strings(rcvCDRs) + if !reflect.DeepEqual(rcvCDRs, expCDRs) { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", expCDRs, rcvCDRs) + } + + if err := reader.Close(); err != nil { + t.Fatal("failed to close reader:", err) + } +} + +func testCDRsOnExpKafkaPosterDeleteTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", "localhost:9092") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + partitions, err := conn.ReadPartitions("cgrates_cdrs") + if err != nil { + t.Fatal(err) + } + + if len(partitions) != 1 || partitions[0].Topic != "cgrates_cdrs" { + t.Fatal("expected topic named cgrates_cdrs to exist") + } + + if err := conn.DeleteTopics("cgrates_cdrs"); err != nil { + t.Fatal(err) + } + + experr := `[5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes` + _, err = conn.ReadPartitions("cgrates_cdrs") + if err == nil || err.Error() != experr { + t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err) + } + } /* diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go index f992ce0ca..9a1d3fa3b 100644 --- a/general_tests/cdrs_post_failover_it_test.go +++ b/general_tests/cdrs_post_failover_it_test.go @@ -95,6 +95,15 @@ func testCDRsPostFailoverInitCdrDb(t *testing.T) { } func testCDRsPostFailoverStartEngine(t *testing.T) { + // before starting the engine, create the directories needed for failed posts or + // clear their contents if they exist already + if err := os.RemoveAll(cdrsPostFailCfg.GeneralCfg().FailedPostsDir); err != nil { + t.Fatal("Error removing folder: ", cdrsPostFailCfg.GeneralCfg().FailedPostsDir, err) + } + if err := os.MkdirAll(cdrsPostFailCfg.GeneralCfg().FailedPostsDir, 0755); err != nil { + t.Error(err) + } + if _, err := engine.StopStartEngine(cdrsPostFailCfgPath, *waitRater); err != nil { t.Fatal(err) } diff --git a/general_tests/cdrs_processevent_it_test.go b/general_tests/cdrs_processevent_it_test.go index 23aff94c2..ce82e3ecb 100644 --- a/general_tests/cdrs_processevent_it_test.go +++ b/general_tests/cdrs_processevent_it_test.go @@ -31,7 +31,6 @@ import ( "testing" "time" - v1 "github.com/cgrates/cgrates/apier/v1" v2 "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -45,12 +44,15 @@ var ( pecdrsRpc *rpc.Client sTestsCDRsIT_ProcessEvent = []func(t *testing.T){ + testV1CDRsRemoveFolders, + testV1CDRsCreateFolders, testV1CDRsInitConfig, testV1CDRsInitDataDb, testV1CDRsInitCdrDb, testV1CDRsStartEngine, testV1CDRsRpcConn, - testV1CDRsLoadTariffPlanFromFolder, + testV1CDRsLoadTPs, + testV1CDRsProcessEventExport, testV1CDRsProcessEventAttrS, testV1CDRsProcessEventChrgS, @@ -59,7 +61,9 @@ var ( testV1CDRsProcessEventStore, testV1CDRsProcessEventThreshold, testV1CDRsProcessEventExportCheck, + testV1CDRsKillEngine, + testV1CDRsRemoveFolders, } ) @@ -102,6 +106,15 @@ func testV1CDRsInitCdrDb(t *testing.T) { } func testV1CDRsStartEngine(t *testing.T) { + // before starting the engine, create the directories needed for failed posts or + // clear their contents if they exist already + if err := os.RemoveAll(pecdrsCfg.GeneralCfg().FailedPostsDir); err != nil { + t.Fatal("Error removing folder: ", pecdrsCfg.GeneralCfg().FailedPostsDir, err) + } + if err := os.MkdirAll(pecdrsCfg.GeneralCfg().FailedPostsDir, 0755); err != nil { + t.Error(err) + } + if _, err := engine.StopStartEngine(pecdrsCfgPath, *waitRater); err != nil { t.Fatal(err) } @@ -115,43 +128,121 @@ func testV1CDRsRpcConn(t *testing.T) { } } -func testV1CDRsLoadTariffPlanFromFolder(t *testing.T) { +func testV1CDRsLoadTPs(t *testing.T) { + writeFile := func(fileName, data string) error { + csvFile, err := os.Create(path.Join("/tmp/TestCDRsITPE", fileName)) + if err != nil { + return err + } + defer csvFile.Close() + _, err = csvFile.WriteString(data) + if err != nil { + return err + + } + return csvFile.Sync() + } + + // Create and populate AccountActions.csv + if err := writeFile(utils.AccountActionsCsv, ` +#Tenant,Account,ActionPlanId,ActionTriggersId,AllowNegative,Disabled +cgrates.org,1001,PACKAGE_1001,,, +`); err != nil { + t.Fatal(err) + } + + // Create and populate ActionPlans.csv + if err := writeFile(utils.ActionPlansCsv, ` +#Id,ActionsId,TimingId,Weight +PACKAGE_1001,TOPUP_RST_MONETARY_10,*asap,10 +`); err != nil { + t.Fatal(err) + } + + // Create and populate Actions.csv + if err := writeFile(utils.ActionsCsv, ` +#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16] +TOPUP_RST_MONETARY_10,*topup_reset,,,,*monetary,,*any,,,*unlimited,,10,10,false,false,10 +TOPUP_MONETARY_10,*topup,,,,*monetary,,*any,,,*unlimited,,10,10,false,false,10 +`); err != nil { + t.Fatal(err) + } + + // Create and populate Attributes.csv + if err := writeFile(utils.AttributesCsv, ` +#Tenant,ID,Contexts,FilterIDs,ActivationInterval,AttributeFilterIDs,Path,Type,Value,Blocker,Weight +cgrates.org,ATTR_SUPPLIER1,*chargers,,,,*req.Subject,*constant,SUPPLIER1,false,10 +cgrates.org,ATTR_SubjChange,,,,,*req.Subject,*constant,1011,false,10 +`); err != nil { + t.Fatal(err) + } + + // Create and populate Chargers.csv + if err := writeFile(utils.ChargersCsv, ` +#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight +cgrates.org,Raw,,,*raw,*constant:*req.RequestType:*none,20 +cgrates.org,CustomerCharges,,,CustomerCharges,*none,20 +cgrates.org,SupplierCharges,,,SupplierCharges,ATTR_SUPPLIER1,10 +`); err != nil { + t.Fatal(err) + } + + // Create and populate DestinationRates.csv + if err := writeFile(utils.DestinationRatesCsv, ` +#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy +DR_ANY_1CNT,*any,RT_1CNT,*up,5,0, +`); err != nil { + t.Fatal(err) + } + + // Create and populate Rates.csv + if err := writeFile(utils.RatesCsv, ` +#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart +RT_1CNT,0,0.01,60s,1s,0s +`); err != nil { + t.Fatal(err) + } + + // Create and populate RatingPlans.csv + if err := writeFile(utils.RatingPlansCsv, ` +#Id,DestinationRatesId,TimingTag,Weight +RP_TESTIT1,DR_ANY_1CNT,*any,10 +`); err != nil { + t.Fatal(err) + } + + // Create and populate RatingProfiles.csv + if err := writeFile(utils.RatingProfilesCsv, ` +#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject +cgrates.org,call,*any,2018-01-01T00:00:00Z,RP_TESTIT1, +`); err != nil { + t.Fatal(err) + } + + // Create and populate Stats.csv + if err := writeFile(utils.StatsCsv, ` +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] +cgrates.org,Stat_1,*string:~*req.Account:1001,2014-07-29T15:00:00Z,100,5s,0,*acd;*tcd;*asr,,false,true,30,*none +`); err != nil { + t.Fatal(err) + } + + // Create and populate Thresholds.csv + if err := writeFile(utils.ThresholdsCsv, ` +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],Weight[8],ActionIDs[9],Async[10] +cgrates.org,THD_ACNT_1001,*string:~*req.Account:1001,2014-07-29T15:00:00Z,-1,0,0,false,10,TOPUP_MONETARY_10,false +`); err != nil { + t.Fatal(err) + } + var loadInst string if err := pecdrsRpc.Call(utils.APIerSv1LoadTariffPlanFromFolder, - &utils.AttrLoadTpFromFolder{FolderPath: path.Join( - *dataDir, "tariffplans", "testit")}, &loadInst); err != nil { + &utils.AttrLoadTpFromFolder{FolderPath: "/tmp/TestCDRsITPE"}, &loadInst); err != nil { t.Error(err) } - time.Sleep(time.Duration(*waitRater) * time.Millisecond) } func testV1CDRsProcessEventAttrS(t *testing.T) { - var acnt *engine.Account - acntAttrs := &utils.AttrGetAccount{ - Tenant: "cgrates.org", - Account: "test1_processEvent"} - attrSetBalance := utils.AttrSetBalance{ - Tenant: acntAttrs.Tenant, - Account: acntAttrs.Account, - BalanceType: utils.VOICE, - Value: 120000000000, - Balance: map[string]interface{}{ - utils.ID: "BALANCE1", - utils.Weight: 20, - }, - } - var reply string - if err := pecdrsRpc.Call(utils.APIerSv1SetBalance, attrSetBalance, &reply); err != nil { - t.Error(err) - } else if reply != utils.OK { - t.Errorf("received: %s", reply) - } - expectedVoice := 120000000000.0 - if err := pecdrsRpc.Call(utils.APIerSv2GetAccount, acntAttrs, &acnt); err != nil { - t.Error(err) - } else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != expectedVoice { - t.Errorf("Expecting: %v, received: %v", expectedVoice, rply) - } argsEv := &engine.ArgV1ProcessEvent{ Flags: []string{utils.MetaAttributes, utils.MetaStore, "*chargers:false", "*export:false"}, CGREvent: utils.CGREvent{ @@ -170,37 +261,7 @@ func testV1CDRsProcessEventAttrS(t *testing.T) { }, } var cdrs []*engine.CDR - alsPrf := &v1.AttributeWithCache{ - AttributeProfile: &engine.AttributeProfile{ - Tenant: "cgrates.org", - ID: "ApierTest", - Contexts: []string{utils.META_ANY}, - FilterIDs: []string{"*string:~*req.Account:1001"}, - Attributes: []*engine.Attribute{ - { - Path: utils.MetaReq + utils.NestingSep + utils.Subject, - Value: config.NewRSRParsersMustCompile("1011", true, utils.INFIELD_SEP), - }, - }, - Weight: 20, - }, - } - alsPrf.Compile() - var result string - if err := pecdrsRpc.Call(utils.APIerSv1SetAttributeProfile, alsPrf, &result); err != nil { - t.Error(err) - } else if result != utils.OK { - t.Error("Unexpected reply returned", result) - } - var replyAt *engine.AttributeProfile - if err := pecdrsRpc.Call(utils.APIerSv1GetAttributeProfile, &utils.TenantIDWithArgDispatcher{ - TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "ApierTest"}}, &replyAt); err != nil { - t.Fatal(err) - } - replyAt.Compile() - if !reflect.DeepEqual(alsPrf.AttributeProfile, replyAt) { - t.Errorf("Expecting : %+v, received: %+v", utils.ToJSON(alsPrf.AttributeProfile), utils.ToJSON(replyAt)) - } + var reply string if err := pecdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err != nil { t.Error(err) } else if reply != utils.OK { @@ -230,10 +291,9 @@ func testV1CDRsProcessEventAttrS(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", argsEv.Event["Usage"], cdrs[0].Usage) } else if !reflect.DeepEqual(argsEv.Tenant, cdrs[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", argsEv.Tenant, cdrs[0].Tenant) - } else if !reflect.DeepEqual(alsPrf.Attributes[0].Value[0].Rules, cdrs[0].Subject) { - t.Errorf("Expecting: %+v, received: %+v", alsPrf.Attributes[0].Value[0].Rules, cdrs[0].Subject) + } else if cdrs[0].Subject != "1011" { + t.Errorf("Expecting: %+v, received: %+v", "1011", cdrs[0].Subject) } - return } func testV1CDRsProcessEventChrgS(t *testing.T) { @@ -603,3 +663,15 @@ func testV1CDRsKillEngine(t *testing.T) { t.Error(err) } } + +func testV1CDRsCreateFolders(t *testing.T) { + if err := os.MkdirAll("/tmp/TestCDRsITPE", 0755); err != nil { + t.Error(err) + } +} + +func testV1CDRsRemoveFolders(t *testing.T) { + if err := os.RemoveAll("/tmp/TestCDRsITPE"); err != nil { + t.Error(err) + } +} diff --git a/general_tests/libtest.go b/general_tests/libtest.go index ce39d19fe..cdaf55da8 100644 --- a/general_tests/libtest.go +++ b/general_tests/libtest.go @@ -29,7 +29,7 @@ import ( var ( dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") - waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") + waitRater = flag.Int("wait_rater", 500, "Number of miliseconds to wait for rater to start and cache") encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") dbType = flag.String("dbtype", utils.MetaInternal, "The type of DataBase (Internal/Mongo/mySql)") err error