Tpe dispathcer hosts + fixes and cfg

This commit is contained in:
porosnicuadrian
2022-03-17 18:02:04 +02:00
committed by Dan Christian Bogos
parent 0caf4123d1
commit 10186ff23c
6 changed files with 207 additions and 30 deletions

View File

@@ -61,12 +61,12 @@ var (
testTPeSetActions,
testTPeSetThresholds,
testTPeSetDispatcherProfiles,
testSeTPeSetDispatcherHosts,
testTPeSExportTariffPlanHalfTariffPlan,
testTPeSExportTariffPlanAllTariffPlan,
// export again after we will flush the database
testTPeSInitDataDb,
testTPeSKillEngine,
testTPeSInitCfg,
testTPeSStartEngine,
testTPeSRPCConn,
@@ -78,7 +78,7 @@ var (
func TestTPeSIT(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
tpeSConfigDIR = "tutinternal"
tpeSConfigDIR = "tpe_internal"
case utils.MetaMongo:
tpeSConfigDIR = "tutmongo"
case utils.MetaMySQL:
@@ -991,23 +991,66 @@ func testTPeSetDispatcherProfiles(t *testing.T) {
}
}
func testSeTPeSetDispatcherHosts(t *testing.T) {
dspPrf := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "DSH1",
Address: "*internal",
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
}
var result string
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetDispatcherHost, dspPrf, &result); err != nil {
t.Fatal(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
dspPrf2 := &engine.DispatcherHostWithAPIOpts{
DispatcherHost: &engine.DispatcherHost{
Tenant: "cgrates.org",
RemoteHost: &config.RemoteHost{
ID: "DSH2",
Address: "127.0.0.1:6012",
Transport: utils.MetaJSON,
ConnectAttempts: 1,
Reconnects: 3,
ConnectTimeout: time.Minute,
ReplyTimeout: 2 * time.Minute,
},
},
}
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetDispatcherHost, dspPrf2, &result); err != nil {
t.Fatal(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testTPeSExportTariffPlanHalfTariffPlan(t *testing.T) {
var replyBts []byte
// we will get only the wantes tariff plans in the csv format
if err := tpeSRPC.Call(context.Background(), utils.TPeSv1ExportTariffPlan, &tpes.ArgsExportTP{
Tenant: "cgrates.org",
ExportItems: map[string][]string{
utils.MetaAttributes: {"TEST_ATTRIBUTES_IT_TEST"},
utils.MetaResources: {"ResGroup1"},
utils.MetaFilters: {"fltr_for_prf"},
utils.MetaRateS: {"MultipleRates"},
utils.MetaChargers: {"Chargers1"},
utils.MetaRoutes: {"ROUTE_2003"},
utils.MetaAccounts: {"Account_balances"},
utils.MetaStats: {"SQ_basic"},
utils.MetaActions: {"Execute_thd"},
utils.MetaThresholds: {"TH_Stats1"},
utils.MetaDispatchers: {"Dsp1"},
utils.MetaAttributes: {"TEST_ATTRIBUTES_IT_TEST"},
utils.MetaResources: {"ResGroup1"},
utils.MetaFilters: {"fltr_for_prf"},
utils.MetaRateS: {"MultipleRates"},
utils.MetaChargers: {"Chargers1"},
utils.MetaRoutes: {"ROUTE_2003"},
utils.MetaAccounts: {"Account_balances"},
utils.MetaStats: {"SQ_basic"},
utils.MetaActions: {"Execute_thd"},
utils.MetaThresholds: {"TH_Stats1"},
utils.MetaDispatchers: {"Dsp1"},
utils.MetaDispatcherHosts: {"DSH1"},
},
}, &replyBts); err != nil {
t.Error(err)
@@ -1097,6 +1140,10 @@ func testTPeSExportTariffPlanHalfTariffPlan(t *testing.T) {
{"#Tenant", "ID", "FilterIDs", "Weight", "Strategy", "StrategyParameters", "ConnID", "ConnFilterIDs", "ConnWeight", "ConnBlocker", "ConnParameters"},
{"cgrates.org", "Dsp1", "*string:~*req.Account:1001;*ai:~*req.AnswerTime:2014-07-14T14:25:00Z", "20", "*first", "false", "C1", "", "10", "false", "192.168.54.203"},
},
utils.DispatcherHostsCsv: {
{"#Tenant", "ID", "Address", "Transport", "ConnectAttempts", "Reconnects", "ConnectTimeout", "ReplyTimeout", "Tls", "ClientKey", "ClientCertificate", "CaCertificate"},
{"cgrates.org", "DSH1", "*internal", "", "1", "3", "1m0s", "2m0s", "false", "", "", ""},
},
}
expected[utils.RatesCsv] = csvRply[utils.RatesCsv]
expected[utils.AccountsCsv] = csvRply[utils.AccountsCsv]
@@ -1112,17 +1159,18 @@ func testTPeSExportTariffPlanAllTariffPlan(t *testing.T) {
if err := tpeSRPC.Call(context.Background(), utils.TPeSv1ExportTariffPlan, &tpes.ArgsExportTP{
Tenant: "cgrates.org",
ExportItems: map[string][]string{
utils.MetaAttributes: {"TEST_ATTRIBUTES_IT_TEST", "TEST_ATTRIBUTES_IT_TEST_SECOND"},
utils.MetaResources: {"ResGroup1", "ResGroup2"},
utils.MetaFilters: {"fltr_for_prf", "fltr_changed2"},
utils.MetaRateS: {"MultipleRates", "TEST_RATE_IT_TEST"},
utils.MetaChargers: {"Chargers1", "DifferentCharger"},
utils.MetaRoutes: {"ROUTE_2003", "ROUTE_ACNT_1001"},
utils.MetaAccounts: {"Account_balances", "Account_simple"},
utils.MetaStats: {"SQ_basic", "SQ_2"},
utils.MetaActions: {"Execute_thd", "SET_BAL"},
utils.MetaThresholds: {"TH_Stats1", "THD_2"},
utils.MetaDispatchers: {"Dsp1", "Dsp2"},
utils.MetaAttributes: {"TEST_ATTRIBUTES_IT_TEST", "TEST_ATTRIBUTES_IT_TEST_SECOND"},
utils.MetaResources: {"ResGroup1", "ResGroup2"},
utils.MetaFilters: {"fltr_for_prf", "fltr_changed2"},
utils.MetaRateS: {"MultipleRates", "TEST_RATE_IT_TEST"},
utils.MetaChargers: {"Chargers1", "DifferentCharger"},
utils.MetaRoutes: {"ROUTE_2003", "ROUTE_ACNT_1001"},
utils.MetaAccounts: {"Account_balances", "Account_simple"},
utils.MetaStats: {"SQ_basic", "SQ_2"},
utils.MetaActions: {"Execute_thd", "SET_BAL"},
utils.MetaThresholds: {"TH_Stats1", "THD_2"},
utils.MetaDispatchers: {"Dsp1", "Dsp2"},
utils.MetaDispatcherHosts: {"DSH1", "DSH2"},
},
}, &replyBts); err != nil {
t.Error(err)
@@ -1232,6 +1280,11 @@ func testTPeSExportTariffPlanAllTariffPlan(t *testing.T) {
{"cgrates.org", "Dsp1", "*string:~*req.Account:1001;*ai:~*req.AnswerTime:2014-07-14T14:25:00Z", "20", "*first", "false", "C1", "", "10", "false", "192.168.54.203"},
{"cgrates.org", "Dsp2", "*string:~*opts.EventType:LoadDispatcher", "10", "*weight", "", "Conn2", "*suffix:~*opts.*answerTime:45T", "0", "false", "*ratio:1"},
},
utils.DispatcherHostsCsv: {
{"#Tenant", "ID", "Address", "Transport", "ConnectAttempts", "Reconnects", "ConnectTimeout", "ReplyTimeout", "Tls", "ClientKey", "ClientCertificate", "CaCertificate"},
{"cgrates.org", "DSH1", "*internal", "", "1", "3", "1m0s", "2m0s", "false", "", "", ""},
{"cgrates.org", "DSH2", "127.0.0.1:6012", "*json", "1", "3", "1m0s", "2m0s", "false", "", "", ""},
},
}
expected[utils.RatesCsv] = csvRply[utils.RatesCsv]
expected[utils.AccountsCsv] = csvRply[utils.AccountsCsv]

View File

@@ -0,0 +1,49 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
"reply_timeout": "50s"
},
"data_db": {
"db_type": "*internal"
},
"sessions": {
"enabled": true,
},
"admins": {
"enabled": true,
},
"tpes": {
"enabled": true
},
"caches":{
"partitions": {
"*resource_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control resource profiles caching
// control resources caching
"*statqueue_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // statqueue profiles
"*threshold_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control threshold profiles caching
"*filters": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control filters caching
"*route_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control route profile caching
"*attribute_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control attribute profile caching
"*charger_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control charger profile caching
"*dispatcher_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher profile caching
"*dispatcher_hosts": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control dispatcher hosts caching
"*rate_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control rate profile caching
"*action_profiles": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control action profile caching
"*accounts": {"limit": 0, "ttl": "", "static_ttl": false, "precache": false, "replicate": false}, // control account profile caching
},
"replication_conns": [],
},
}

View File

@@ -63,7 +63,6 @@
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rals_conns": ["*internal"]
},
@@ -72,7 +71,6 @@
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"]
},
@@ -80,7 +78,6 @@
"admins": {
"enabled": true,
"scheduler_conns": ["*internal"]
},

View File

@@ -0,0 +1,75 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package tpes
import (
"encoding/csv"
"fmt"
"io"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type TPDispatcherHosts struct {
dm *engine.DataManager
}
// newTPDispatcherHosts is the constructor for TPDispatcherHosts
func newTPDispatcherHosts(dm *engine.DataManager) *TPDispatcherHosts {
return &TPDispatcherHosts{
dm: dm,
}
}
// exportItems for TPDispatcherHosts will implement the method for tpExporter interface
func (tpDspHst TPDispatcherHosts) exportItems(ctx *context.Context, wrtr io.Writer, tnt string, itmIDs []string) (err error) {
csvWriter := csv.NewWriter(wrtr)
csvWriter.Comma = utils.CSVSep
// before writing the profiles, we must write the headers
if err = csvWriter.Write([]string{"#Tenant", "ID", "Address", "Transport", "ConnectAttempts", "Reconnects", "ConnectTimeout", "ReplyTimeout", "Tls", "ClientKey", "ClientCertificate", "CaCertificate"}); err != nil {
return
}
for _, dspHostID := range itmIDs {
var dspHostPrf *engine.DispatcherHost
dspHostPrf, err = tpDspHst.dm.GetDispatcherHost(ctx, tnt, dspHostID, true, true, utils.NonTransactional)
if err != nil {
if err.Error() == utils.ErrNotFound.Error() {
return fmt.Errorf("<%s> cannot find DispatcherHost with id: <%v>", err, dspHostID)
}
return err
}
dspHstsMdl := engine.APItoModelTPDispatcherHost(engine.DispatcherHostToAPI(dspHostPrf))
// for every profile, convert it into model to be compatible in csv format
// transform every record into a []string
record, err := engine.CsvDump(dspHstsMdl)
if err != nil {
return err
}
// record is a line of a csv file
if err := csvWriter.Write(record); err != nil {
return err
}
}
csvWriter.Flush()
return
}

View File

@@ -89,6 +89,8 @@ func getTariffPlansKeys(ctx *context.Context, dm *engine.DataManager, tnt, expTy
prfx = utils.ThresholdProfilePrefix + tnt + utils.ConcatenatedKeySep
case utils.MetaDispatchers:
prfx = utils.DispatcherProfilePrefix + tnt + utils.ConcatenatedKeySep
case utils.MetaDispatcherHosts:
prfx = utils.DispatcherHostPrefix + tnt + utils.ConcatenatedKeySep
default:
return nil, fmt.Errorf("Unsuported exporter type")
}

View File

@@ -38,9 +38,8 @@ var tpExporterTypes = utils.NewStringSet([]string{
utils.MetaActions,
utils.MetaThresholds,
utils.MetaDispatchers,
/*
utils.MetaDispatcherHosts, //
*/})
utils.MetaDispatcherHosts,
})
var exportFileName = map[string]string{
utils.MetaAttributes: utils.AttributesCsv,
@@ -87,6 +86,8 @@ func newTPExporter(expType string, dm *engine.DataManager) (tpE tpExporter, err
return newTPThresholds(dm), nil
case utils.MetaDispatchers:
return newTPDispatchers(dm), nil
case utils.MetaDispatcherHosts:
return newTPDispatcherHosts(dm), nil
default:
return nil, utils.ErrPrefix(utils.ErrUnsupportedTPExporterType, expType)
}