Fix merge conflicts, improving creation of new rpc pool

This commit is contained in:
DanB
2016-03-24 09:18:38 +01:00
155 changed files with 8685 additions and 3439 deletions

View File

@@ -71,7 +71,7 @@ var (
err error
)
func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, exitChan chan bool) {
func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case)
var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork
for {
@@ -102,7 +102,7 @@ func startCdrcs(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan
}
// Fires up a cdrc instance
func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *engine.Responder, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool,
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool,
closeChan chan struct{}, exitChan chan bool) {
var cdrsConn rpcclient.RpcClientConnection
var cdrcCfg *config.CdrcConfig
@@ -136,50 +136,29 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
}
}
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, server *utils.Server, exitChan chan bool) {
func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Generic service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
var client *rpcclient.RpcClient
var err error
// Connect to rater
for _, raterCfg := range cfg.SmGenericConfig.RaterConns {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn.AddClient(resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-Generic> Could not connect to Rater via RPC: %v", err))
exitChan <- true
return
}
raterConn.AddClient(client)
}
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmGenericConfig.RaterConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmGenericConfig.CdrsConns, cfg.SmGenericConfig.RaterConns) {
cdrsConn = raterConn
} else if len(cfg.SmGenericConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmGenericConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-Generic> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn.AddClient(client)
}
var cdrsConn *rpcclient.RpcClientPool
if reflect.DeepEqual(cfg.SmGenericConfig.RaterConns, cfg.SmGenericConfig.CdrsConns) {
cdrsConn = ralConn
} else {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmGenericConfig.CdrsConns, internalCDRSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
}
smg_econns := sessionmanager.NewSMGExternalConnections()
sm := sessionmanager.NewSMGeneric(cfg, raterConn, cdrsConn, cfg.DefaultTimezone, smg_econns)
sm := sessionmanager.NewSMGeneric(cfg, ralConn, cdrsConn, cfg.DefaultTimezone, smg_econns)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Generic> error: %s!", err))
}
@@ -197,28 +176,23 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
server.BijsonRegisterOnDisconnect(smg_econns.OnClientDisconnect)
}
func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
smgConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
var client *rpcclient.RpcClient
var err error
for _, smgCfg := range cfg.DiameterAgentCfg().SMGenericConns {
if smgCfg.Server == utils.INTERNAL {
smgRpc := <-internalSMGChan
internalSMGChan <- smgRpc
client, _ = rpcclient.NewRpcClient("", "", 0, 0, rpcclient.INTERNAL_RPC, smgRpc)
smgConn.AddClient(client)
} else {
client, err = rpcclient.NewRpcClient("tcp", smgCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
return
}
smgConn.AddClient(client)
}
smgConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
return
}
da, err := agents.NewDiameterAgent(cfg, smgConn)
pubsubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
exitChan <- true
return
}
da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
exitChan <- true
@@ -230,47 +204,23 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
exitChan <- true
}
func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
var client *rpcclient.RpcClient
var err error
// Connect to rater
for _, raterCfg := range cfg.SmFsConfig.RaterConns {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn.AddClient(resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
raterConn.AddClient(client)
}
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmFsConfig.RaterConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Connect to CDRS
if len(cfg.SmFsConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmFsConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn.AddClient(client)
}
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmFsConfig.CdrsConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, raterConn, cdrsConn, cfg.DefaultTimezone)
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, ralConn, cdrsConn, cfg.DefaultTimezone)
smRpc.SMs = append(smRpc.SMs, sm)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-FreeSWITCH> error: %s!", err))
@@ -278,49 +228,23 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, internalCDRSCha
exitChan <- true
}
func startSmKamailio(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-Kamailio service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
var client *rpcclient.RpcClient
var err error
// Connect to rater
for _, raterCfg := range cfg.SmKamConfig.RaterConns {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn.AddClient(resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
raterConn.AddClient(client)
}
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmKamConfig.RaterConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmKamConfig.CdrsConns, cfg.SmKamConfig.RaterConns) {
cdrsConn = raterConn
} else if len(cfg.SmKamConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmKamConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn.AddClient(client)
}
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmKamConfig.CdrsConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, cfg.DefaultTimezone)
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, ralConn, cdrsConn, cfg.DefaultTimezone)
smRpc.SMs = append(smRpc.SMs, sm)
if err = sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-Kamailio> error: %s!", err))
@@ -328,49 +252,23 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, internalCDRSChan
exitChan <- true
}
func startSmOpenSIPS(internalRaterChan chan *engine.Responder, internalCDRSChan chan *engine.CdrServer, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
cdrsConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
var client *rpcclient.RpcClient
var err error
// Connect to rater
for _, raterCfg := range cfg.SmOsipsConfig.RaterConns {
if raterCfg.Server == utils.INTERNAL {
resp := <-internalRaterChan
raterConn.AddClient(resp)
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to rater via RPC: %v", err))
exitChan <- true
return
}
raterConn.AddClient(client)
}
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmOsipsConfig.RaterConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Connect to CDRS
if reflect.DeepEqual(cfg.SmOsipsConfig.CdrsConns, cfg.SmOsipsConfig.RaterConns) {
cdrsConn = raterConn
} else if len(cfg.SmOsipsConfig.CdrsConns) != 0 {
for _, cdrsCfg := range cfg.SmOsipsConfig.CdrsConns {
if cdrsCfg.Server == utils.INTERNAL {
resp := <-internalCDRSChan
cdrsConn.AddClient(resp)
internalCDRSChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn.AddClient(client)
}
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.SmOsipsConfig.CdrsConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, raterConn, cdrsConn, cfg.DefaultTimezone)
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, cfg.Reconnects, ralConn, cdrsConn, cfg.DefaultTimezone)
smRpc.SMs = append(smRpc.SMs, sm)
if err := sm.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
@@ -378,92 +276,53 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, internalCDRSChan
exitChan <- true
}
func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, cdrDb engine.CdrStorage,
internalRaterChan chan *engine.Responder, internalPubSubSChan chan rpcclient.RpcClientConnection,
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine.LogStorage, cdrDb engine.CdrStorage,
internalRaterChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS CDRS service.")
var err error
var client *rpcclient.RpcClient
// Rater connection init
raterConn := rpcclient.NewRpcClientPool(rpcclient.POOL_FIRST)
for _, raterCfg := range cfg.CDRSRaterConns {
if raterCfg.Server == utils.INTERNAL {
responder := <-internalRaterChan // Wait for rater to come up before start querying
raterConn.AddClient(responder)
internalRaterChan <- responder // Put back the connection since there might be other entities waiting for it
} else if len(raterCfg.Server) != 0 {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to rater: %s", err.Error()))
exitChan <- true
return
}
raterConn.AddClient(client)
}
// Conn pool towards RAL
ralConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSRaterConns, internalRaterChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
return
}
// Pubsub connection init
var pubSubConn rpcclient.RpcClientConnection
if cfg.CDRSPubSub == utils.INTERNAL {
pubSubs := <-internalPubSubSChan
pubSubConn = pubSubs
internalPubSubSChan <- pubSubs
} else if len(cfg.CDRSPubSub) != 0 {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to pubsub server: %s", err.Error()))
exitChan <- true
return
}
pubSubConn = client
pubSubConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSPubSubSConns, internalPubSubSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
exitChan <- true
return
}
// Users connection init
var usersConn rpcclient.RpcClientConnection
if cfg.CDRSUsers == utils.INTERNAL {
userS := <-internalUserSChan
usersConn = userS
internalUserSChan <- userS
} else if len(cfg.CDRSUsers) != 0 {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to users server: %s", err.Error()))
exitChan <- true
return
}
usersConn = client
usersConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSUserSConns, internalUserSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
exitChan <- true
return
}
// Aliases connection init
var aliasesConn rpcclient.RpcClientConnection
if cfg.CDRSAliases == utils.INTERNAL {
aliaseS := <-internalAliaseSChan
aliasesConn = aliaseS
internalAliaseSChan <- aliaseS
} else if len(cfg.CDRSAliases) != 0 {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to aliases server: %s", err.Error()))
exitChan <- true
return
}
aliasesConn = client
aliasesConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSAliaseSConns, internalAliaseSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
exitChan <- true
return
}
// Stats connection init
var statsConn rpcclient.RpcClientConnection
if cfg.CDRSStats == utils.INTERNAL {
statS := <-internalCdrStatSChan
statsConn = statS
internalCdrStatSChan <- statS
} else if len(cfg.CDRSStats) != 0 {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to stats server: %s", err.Error()))
exitChan <- true
return
}
statsConn = client
statsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB,
cfg.CDRSStatSConns, internalCdrStatSChan)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
return
}
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, raterConn, pubSubConn, usersConn, aliasesConn, statsConn)
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, ralConn, pubSubConn, usersConn, aliasesConn, statsConn)
cdrServer.SetTimeToLive(cfg.ResponseCacheTTL, nil)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHandlersToServer(server)
@@ -536,12 +395,8 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, acco
internalUserSChan <- userServer
}
func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder,
internalCdrSChan chan *engine.CdrServer,
internalCdrStatSChan chan rpcclient.RpcClientConnection,
internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan rpcclient.RpcClientConnection,
func startRpc(server *utils.Server, internalRaterChan chan rpcclient.RpcClientConnection,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan chan rpcclient.RpcClientConnection) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
@@ -562,6 +417,7 @@ func startRpc(server *utils.Server, internalRaterChan chan *engine.Responder,
go server.ServeJSON(cfg.RPCJSONListen)
go server.ServeGOB(cfg.RPCGOBListen)
go server.ServeHTTP(cfg.HTTPListen)
}
func writePid() {
@@ -616,7 +472,7 @@ func main() {
var logDb engine.LogStorage
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
if cfg.RaterEnabled || cfg.SchedulerEnabled { // Only connect to dataDb if necessary
if cfg.RaterEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
ratingDb, err = engine.ConfigureRatingStorage(cfg.TpDbType, cfg.TpDbHost, cfg.TpDbPort,
cfg.TpDbName, cfg.TpDbUser, cfg.TpDbPass, cfg.DBDataEncoding)
if err != nil { // Cannot configure getter database, show stopper
@@ -625,6 +481,8 @@ func main() {
}
defer ratingDb.Close()
engine.SetRatingStorage(ratingDb)
}
if cfg.RaterEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || cfg.AliasesServerEnabled || cfg.UserServerEnabled {
accountDb, err = engine.ConfigureAccountingStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding)
if err != nil { // Cannot configure getter database, show stopper
@@ -660,10 +518,10 @@ func main() {
// Define internal connections via channels
internalBalancerChan := make(chan *balancer2go.Balancer, 1)
internalRaterChan := make(chan *engine.Responder, 1)
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
cacheDoneChan := make(chan struct{}, 1)
internalSchedulerChan := make(chan *scheduler.Scheduler, 1)
internalCdrSChan := make(chan *engine.CdrServer, 1)
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalHistorySChan := make(chan rpcclient.RpcClientConnection, 1)
internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1)
@@ -727,7 +585,7 @@ func main() {
}
if cfg.DiameterAgentCfg().Enabled {
go startDiameterAgent(internalSMGChan, exitChan)
go startDiameterAgent(internalSMGChan, internalPubSubSChan, exitChan)
}
// Start HistoryS service

View File

@@ -40,7 +40,7 @@ func startBalancer(internalBalancerChan chan *balancer2go.Balancer, stopHandled
// Starts rater and reports on chan
func startRater(internalRaterChan chan *engine.Responder, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, internalBalancerChan chan *balancer2go.Balancer, internalSchedulerChan chan *scheduler.Scheduler,
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
server *utils.Server,

View File

@@ -152,6 +152,17 @@ func main() {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "int") {
if err := migratorRC8acc.migrateAccountsInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8rat.migrateActionTriggersInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8rat.migrateActionsInt(); err != nil {
log.Print(err.Error())
}
}
log.Print("Done!")
return
}

View File

@@ -173,20 +173,20 @@ func (mig MigratorRC8) migrateAccounts() error {
}
// transfer data into new structurse
newAcc := &engine.Account{
Id: oldAcc.Id,
BalanceMap: make(map[string]engine.BalanceChain, len(oldAcc.BalanceMap)),
ID: oldAcc.Id,
BalanceMap: make(map[string]engine.Balances, len(oldAcc.BalanceMap)),
UnitCounters: make(engine.UnitCounters, len(oldAcc.UnitCounters)),
ActionTriggers: make(engine.ActionTriggers, len(oldAcc.ActionTriggers)),
AllowNegative: oldAcc.AllowNegative,
Disabled: oldAcc.Disabled,
}
// fix id
idElements := strings.Split(newAcc.Id, utils.CONCATENATED_KEY_SEP)
idElements := strings.Split(newAcc.ID, utils.CONCATENATED_KEY_SEP)
if len(idElements) != 3 {
log.Printf("Malformed account ID %s", oldAcc.Id)
continue
}
newAcc.Id = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
newAcc.ID = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
// balances
balanceErr := false
for oldBalKey, oldBalChain := range oldAcc.BalanceMap {
@@ -198,7 +198,7 @@ func (mig MigratorRC8) migrateAccounts() error {
}
newBalKey := "*" + keyElements[1]
newBalDirection := "*" + keyElements[2]
newAcc.BalanceMap[newBalKey] = make(engine.BalanceChain, len(oldBalChain))
newAcc.BalanceMap[newBalKey] = make(engine.Balances, len(oldBalChain))
for index, oldBal := range oldBalChain {
// check default to set new id
if oldBal.IsDefault() {
@@ -206,12 +206,12 @@ func (mig MigratorRC8) migrateAccounts() error {
}
newAcc.BalanceMap[newBalKey][index] = &engine.Balance{
Uuid: oldBal.Uuid,
Id: oldBal.Id,
ID: oldBal.Id,
Value: oldBal.Value,
Directions: utils.ParseStringMap(newBalDirection),
ExpirationDate: oldBal.ExpirationDate,
Weight: oldBal.Weight,
DestinationIds: utils.ParseStringMap(oldBal.DestinationIds),
DestinationIDs: utils.ParseStringMap(oldBal.DestinationIds),
RatingSubject: oldBal.RatingSubject,
Categories: utils.ParseStringMap(oldBal.Category),
SharedGroups: utils.ParseStringMap(oldBal.SharedGroup),
@@ -227,51 +227,108 @@ func (mig MigratorRC8) migrateAccounts() error {
// unit counters
for _, oldUc := range oldAcc.UnitCounters {
newUc := &engine.UnitCounter{
BalanceType: oldUc.BalanceType,
Balances: make(engine.BalanceChain, len(oldUc.Balances)),
Counters: make(engine.CounterFilters, len(oldUc.Balances)),
}
for index, oldUcBal := range oldUc.Balances {
newUc.Balances[index] = &engine.Balance{
Uuid: oldUcBal.Uuid,
Id: oldUcBal.Id,
Value: oldUcBal.Value,
Directions: utils.ParseStringMap(oldUc.Direction),
ExpirationDate: oldUcBal.ExpirationDate,
Weight: oldUcBal.Weight,
DestinationIds: utils.ParseStringMap(oldUcBal.DestinationIds),
RatingSubject: oldUcBal.RatingSubject,
Categories: utils.ParseStringMap(oldUcBal.Category),
SharedGroups: utils.ParseStringMap(oldUcBal.SharedGroup),
Timings: oldUcBal.Timings,
TimingIDs: utils.ParseStringMap(oldUcBal.TimingIDs),
Disabled: oldUcBal.Disabled,
bf := &engine.BalanceFilter{}
if oldUcBal.Uuid != "" {
bf.Uuid = utils.StringPointer(oldUcBal.Uuid)
}
if oldUcBal.Id != "" {
bf.ID = utils.StringPointer(oldUcBal.Id)
}
if oldUc.BalanceType != "" {
bf.Type = utils.StringPointer(oldUc.BalanceType)
}
// the value was used for counter value
/*if oldUcBal.Value != 0 {
bf.Value = utils.Float64Pointer(oldUcBal.Value)
}*/
if oldUc.Direction != "" {
bf.Directions = utils.StringMapPointer(utils.ParseStringMap(oldUc.Direction))
}
if !oldUcBal.ExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldUcBal.ExpirationDate)
}
if oldUcBal.Weight != 0 {
bf.Weight = utils.Float64Pointer(oldUcBal.Weight)
}
if oldUcBal.DestinationIds != "" {
bf.DestinationIDs = utils.StringMapPointer(utils.ParseStringMap(oldUcBal.DestinationIds))
}
if oldUcBal.RatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldUcBal.RatingSubject)
}
if oldUcBal.Category != "" {
bf.Categories = utils.StringMapPointer(utils.ParseStringMap(oldUcBal.Category))
}
if oldUcBal.SharedGroup != "" {
bf.SharedGroups = utils.StringMapPointer(utils.ParseStringMap(oldUcBal.SharedGroup))
}
if oldUcBal.TimingIDs != "" {
bf.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(oldUcBal.TimingIDs))
}
if oldUcBal.Disabled != false {
bf.Disabled = utils.BoolPointer(oldUcBal.Disabled)
}
bf.Timings = oldUcBal.Timings
cf := &engine.CounterFilter{
Value: oldUcBal.Value,
Filter: bf,
}
newUc.Counters[index] = cf
}
newAcc.UnitCounters[oldUc.BalanceType] = append(newAcc.UnitCounters[oldUc.BalanceType], newUc)
}
// action triggers
for index, oldAtr := range oldAcc.ActionTriggers {
newAcc.ActionTriggers[index] = &engine.ActionTrigger{
UniqueID: oldAtr.Id,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
BalanceId: oldAtr.BalanceId,
BalanceType: oldAtr.BalanceType,
BalanceDirections: utils.ParseStringMap(oldAtr.BalanceDirection),
BalanceDestinationIds: utils.ParseStringMap(oldAtr.BalanceDestinationIds),
BalanceWeight: oldAtr.BalanceWeight,
BalanceExpirationDate: oldAtr.BalanceExpirationDate,
BalanceTimingTags: utils.ParseStringMap(oldAtr.BalanceTimingTags),
BalanceRatingSubject: oldAtr.BalanceRatingSubject,
BalanceCategories: utils.ParseStringMap(oldAtr.BalanceCategory),
BalanceSharedGroups: utils.ParseStringMap(oldAtr.BalanceSharedGroup),
BalanceDisabled: oldAtr.BalanceDisabled,
Weight: oldAtr.Weight,
ActionsId: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
at := &engine.ActionTrigger{
UniqueID: oldAtr.Id,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
Weight: oldAtr.Weight,
ActionsID: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
}
bf := &engine.BalanceFilter{}
if oldAtr.BalanceId != "" {
bf.ID = utils.StringPointer(oldAtr.BalanceId)
}
if oldAtr.BalanceType != "" {
bf.Type = utils.StringPointer(oldAtr.BalanceType)
}
if oldAtr.BalanceRatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAtr.BalanceRatingSubject)
}
if oldAtr.BalanceDirection != "" {
bf.Directions = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceDirection))
}
if oldAtr.BalanceDestinationIds != "" {
bf.DestinationIDs = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceDestinationIds))
}
if oldAtr.BalanceTimingTags != "" {
bf.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceTimingTags))
}
if oldAtr.BalanceCategory != "" {
bf.Categories = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceCategory))
}
if oldAtr.BalanceSharedGroup != "" {
bf.SharedGroups = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceSharedGroup))
}
if oldAtr.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight)
}
if oldAtr.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled)
}
if !oldAtr.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate)
}
at.Balance = bf
newAcc.ActionTriggers[index] = at
if newAcc.ActionTriggers[index].ThresholdType == "*min_counter" ||
newAcc.ActionTriggers[index].ThresholdType == "*max_counter" {
newAcc.ActionTriggers[index].ThresholdType = strings.Replace(newAcc.ActionTriggers[index].ThresholdType, "_", "_event_", 1)
@@ -287,7 +344,7 @@ func (mig MigratorRC8) migrateAccounts() error {
if err != nil {
return err
}
if err := mig.db.Cmd("SET", utils.ACCOUNT_PREFIX+newAcc.Id, result).Err; err != nil {
if err := mig.db.Cmd("SET", utils.ACCOUNT_PREFIX+newAcc.ID, result).Err; err != nil {
return err
}
}
@@ -322,28 +379,53 @@ func (mig MigratorRC8) migrateActionTriggers() error {
}
newAtrs := make(engine.ActionTriggers, len(oldAtrs))
for index, oldAtr := range oldAtrs {
newAtrs[index] = &engine.ActionTrigger{
UniqueID: oldAtr.Id,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
BalanceId: oldAtr.BalanceId,
BalanceType: oldAtr.BalanceType,
BalanceDirections: utils.ParseStringMap(oldAtr.BalanceDirection),
BalanceDestinationIds: utils.ParseStringMap(oldAtr.BalanceDestinationIds),
BalanceWeight: oldAtr.BalanceWeight,
BalanceExpirationDate: oldAtr.BalanceExpirationDate,
BalanceTimingTags: utils.ParseStringMap(oldAtr.BalanceTimingTags),
BalanceRatingSubject: oldAtr.BalanceRatingSubject,
BalanceCategories: utils.ParseStringMap(oldAtr.BalanceCategory),
BalanceSharedGroups: utils.ParseStringMap(oldAtr.BalanceSharedGroup),
BalanceDisabled: oldAtr.BalanceDisabled,
Weight: oldAtr.Weight,
ActionsId: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
at := &engine.ActionTrigger{
UniqueID: oldAtr.Id,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
Weight: oldAtr.Weight,
ActionsID: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
}
bf := &engine.BalanceFilter{}
if oldAtr.BalanceId != "" {
bf.ID = utils.StringPointer(oldAtr.BalanceId)
}
if oldAtr.BalanceType != "" {
bf.Type = utils.StringPointer(oldAtr.BalanceType)
}
if oldAtr.BalanceRatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAtr.BalanceRatingSubject)
}
if oldAtr.BalanceDirection != "" {
bf.Directions = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceDirection))
}
if oldAtr.BalanceDestinationIds != "" {
bf.DestinationIDs = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceDestinationIds))
}
if oldAtr.BalanceTimingTags != "" {
bf.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceTimingTags))
}
if oldAtr.BalanceCategory != "" {
bf.Categories = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceCategory))
}
if oldAtr.BalanceSharedGroup != "" {
bf.SharedGroups = utils.StringMapPointer(utils.ParseStringMap(oldAtr.BalanceSharedGroup))
}
if oldAtr.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight)
}
if oldAtr.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled)
}
if !oldAtr.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate)
}
at.Balance = bf
newAtrs[index] = at
if newAtrs[index].ThresholdType == "*min_counter" ||
newAtrs[index].ThresholdType == "*max_counter" {
newAtrs[index].ThresholdType = strings.Replace(newAtrs[index].ThresholdType, "_", "_event_", 1)
@@ -381,29 +463,53 @@ func (mig MigratorRC8) migrateActions() error {
}
newAcs := make(engine.Actions, len(oldAcs))
for index, oldAc := range oldAcs {
newAcs[index] = &engine.Action{
a := &engine.Action{
Id: oldAc.Id,
ActionType: oldAc.ActionType,
BalanceType: oldAc.BalanceType,
ExtraParameters: oldAc.ExtraParameters,
ExpirationString: oldAc.ExpirationString,
Weight: oldAc.Weight,
Balance: &engine.Balance{
Uuid: oldAc.Balance.Uuid,
Id: oldAc.Balance.Id,
Value: oldAc.Balance.Value,
Directions: utils.ParseStringMap(oldAc.Direction),
ExpirationDate: oldAc.Balance.ExpirationDate,
Weight: oldAc.Balance.Weight,
DestinationIds: utils.ParseStringMap(oldAc.Balance.DestinationIds),
RatingSubject: oldAc.Balance.RatingSubject,
Categories: utils.ParseStringMap(oldAc.Balance.Category),
SharedGroups: utils.ParseStringMap(oldAc.Balance.SharedGroup),
Timings: oldAc.Balance.Timings,
TimingIDs: utils.ParseStringMap(oldAc.Balance.TimingIDs),
Disabled: oldAc.Balance.Disabled,
},
Balance: &engine.BalanceFilter{},
}
bf := a.Balance
if oldAc.Balance.Uuid != "" {
bf.Uuid = utils.StringPointer(oldAc.Balance.Uuid)
}
if oldAc.Balance.Id != "" {
bf.ID = utils.StringPointer(oldAc.Balance.Id)
}
if oldAc.BalanceType != "" {
bf.Type = utils.StringPointer(oldAc.BalanceType)
}
if oldAc.Balance.Value != 0 {
bf.Value = utils.Float64Pointer(oldAc.Balance.Value)
}
if oldAc.Balance.RatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAc.Balance.RatingSubject)
}
if oldAc.Balance.DestinationIds != "" {
bf.DestinationIDs = utils.StringMapPointer(utils.ParseStringMap(oldAc.Balance.DestinationIds))
}
if oldAc.Balance.TimingIDs != "" {
bf.TimingIDs = utils.StringMapPointer(utils.ParseStringMap(oldAc.Balance.TimingIDs))
}
if oldAc.Balance.Category != "" {
bf.Categories = utils.StringMapPointer(utils.ParseStringMap(oldAc.Balance.Category))
}
if oldAc.Balance.SharedGroup != "" {
bf.SharedGroups = utils.StringMapPointer(utils.ParseStringMap(oldAc.Balance.SharedGroup))
}
if oldAc.Balance.Weight != 0 {
bf.Weight = utils.Float64Pointer(oldAc.Balance.Weight)
}
if oldAc.Balance.Disabled != false {
bf.Disabled = utils.BoolPointer(oldAc.Balance.Disabled)
}
if !oldAc.Balance.ExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAc.Balance.ExpirationDate)
}
bf.Timings = oldAc.Balance.Timings
newAcs[index] = a
}
newAcsMap[key] = newAcs
}
@@ -475,7 +581,7 @@ func (mig MigratorRC8) migrateActionPlans() error {
// fix id
idElements := strings.Split(actionId, utils.CONCATENATED_KEY_SEP)
if len(idElements) != 3 {
log.Printf("Malformed account ID %s", actionId)
//log.Printf("Malformed account ID %s", actionId)
continue
}
apl.AccountIds[idx] = fmt.Sprintf("%s:%s", idElements[1], idElements[2])
@@ -491,14 +597,14 @@ func (mig MigratorRC8) migrateActionPlans() error {
if !exists {
newApl = &engine.ActionPlan{
Id: apl.Id,
AccountIDs: make(map[string]struct{}),
AccountIDs: make(utils.StringMap),
}
newAplMap[key] = newApl
}
if !apl.IsASAP() {
for _, accID := range apl.AccountIds {
if _, exists := newApl.AccountIDs[accID]; !exists {
newApl.AccountIDs[accID] = struct{}{}
newApl.AccountIDs[accID] = true
}
}
}

View File

@@ -0,0 +1,409 @@
package main
import (
"log"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type Account1 struct {
Id string
BalanceMap map[string]BalanceChain1
UnitCounters UnitCounters1
ActionTriggers ActionTriggers1
AllowNegative bool
Disabled bool
}
type Balance1 struct {
Uuid string //system wide unique
Id string // account wide unique
Value float64
Directions utils.StringMap
ExpirationDate time.Time
Weight float64
DestinationIds utils.StringMap
RatingSubject string
Categories utils.StringMap
SharedGroups utils.StringMap
Timings []*engine.RITiming
TimingIDs utils.StringMap
Disabled bool
Factor engine.ValueFactor
Blocker bool
precision int
account *Account // used to store ub reference for shared balances
dirty bool
}
type BalanceChain1 []*Balance1
type UnitCounter1 struct {
BalanceType string // *monetary/*voice/*sms/etc
CounterType string // *event or *balance
Balances BalanceChain1 // first balance is the general one (no destination)
}
type UnitCounters1 []*UnitCounter1
type Action1 struct {
Id string
ActionType string
BalanceType string
ExtraParameters string
Filter string
ExpirationString string // must stay as string because it can have relative values like 1month
Weight float64
Balance *Balance1
}
type Actions1 []*Action1
type ActionTrigger1 struct {
ID string // original csv tag
UniqueID string // individual id
ThresholdType string
ThresholdValue float64
Recurrent bool // reset eexcuted flag each run
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
BalanceId string
BalanceType string // *monetary/*voice etc
BalanceDirections utils.StringMap // filter for balance
BalanceDestinationIds utils.StringMap // filter for balance
BalanceWeight float64 // filter for balance
BalanceExpirationDate time.Time // filter for balance
BalanceTimingTags utils.StringMap // filter for balance
BalanceRatingSubject string // filter for balance
BalanceCategories utils.StringMap // filter for balance
BalanceSharedGroups utils.StringMap // filter for balance
BalanceBlocker bool
BalanceDisabled bool // filter for balance
Weight float64
ActionsId string
MinQueuedItems int // Trigger actions only if this number is hit (stats only)
Executed bool
}
type ActionTriggers1 []*ActionTrigger1
func (mig MigratorRC8) migrateAccountsInt() error {
keys, err := mig.db.Cmd("KEYS", utils.ACCOUNT_PREFIX+"*").List()
if err != nil {
return err
}
newAccounts := make([]*engine.Account, 0)
var migratedKeys []string
// get existing accounts
for _, key := range keys {
log.Printf("Migrating account: %s...", key)
values, err := mig.db.Cmd("GET", key).Bytes()
if err != nil {
continue
}
var oldAcc Account1
if err = mig.ms.Unmarshal(values, &oldAcc); err != nil {
return err
}
// transfer data into new structurse
newAcc := &engine.Account{
ID: oldAcc.Id,
BalanceMap: make(map[string]engine.Balances, len(oldAcc.BalanceMap)),
UnitCounters: make(engine.UnitCounters),
ActionTriggers: make(engine.ActionTriggers, len(oldAcc.ActionTriggers)),
AllowNegative: oldAcc.AllowNegative,
Disabled: oldAcc.Disabled,
}
// balances
balanceErr := false
for key, oldBalChain := range oldAcc.BalanceMap {
newAcc.BalanceMap[key] = make(engine.Balances, len(oldBalChain))
for index, oldBal := range oldBalChain {
newAcc.BalanceMap[key][index] = &engine.Balance{
Uuid: oldBal.Uuid,
ID: oldBal.Id,
Value: oldBal.Value,
Directions: oldBal.Directions,
ExpirationDate: oldBal.ExpirationDate,
Weight: oldBal.Weight,
DestinationIDs: oldBal.DestinationIds,
RatingSubject: oldBal.RatingSubject,
Categories: oldBal.Categories,
SharedGroups: oldBal.SharedGroups,
Timings: oldBal.Timings,
TimingIDs: oldBal.TimingIDs,
Disabled: oldBal.Disabled,
Factor: oldBal.Factor,
Blocker: oldBal.Blocker,
}
}
}
if balanceErr {
continue
}
// unit counters
for _, oldUc := range oldAcc.UnitCounters {
newUc := &engine.UnitCounter{
Counters: make(engine.CounterFilters, len(oldUc.Balances)),
}
for index, oldUcBal := range oldUc.Balances {
b := &engine.Balance{
Uuid: oldUcBal.Uuid,
ID: oldUcBal.Id,
Value: oldUcBal.Value,
Directions: oldUcBal.Directions,
ExpirationDate: oldUcBal.ExpirationDate,
Weight: oldUcBal.Weight,
DestinationIDs: oldUcBal.DestinationIds,
RatingSubject: oldUcBal.RatingSubject,
Categories: oldUcBal.Categories,
SharedGroups: oldUcBal.SharedGroups,
Timings: oldUcBal.Timings,
TimingIDs: oldUcBal.TimingIDs,
Disabled: oldUcBal.Disabled,
Factor: oldUcBal.Factor,
Blocker: oldUcBal.Blocker,
}
bf := &engine.BalanceFilter{}
bf.LoadFromBalance(b)
cf := &engine.CounterFilter{
Value: oldUcBal.Value,
Filter: bf,
}
newUc.Counters[index] = cf
}
newAcc.UnitCounters[oldUc.BalanceType] = append(newAcc.UnitCounters[oldUc.BalanceType], newUc)
}
// action triggers
for index, oldAtr := range oldAcc.ActionTriggers {
at := &engine.ActionTrigger{
ID: oldAtr.ID,
UniqueID: oldAtr.UniqueID,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
Weight: oldAtr.Weight,
ActionsID: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
}
bf := &engine.BalanceFilter{}
if oldAtr.BalanceId != "" {
bf.ID = utils.StringPointer(oldAtr.BalanceId)
}
if oldAtr.BalanceType != "" {
bf.Type = utils.StringPointer(oldAtr.BalanceType)
}
if oldAtr.BalanceRatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAtr.BalanceRatingSubject)
}
if !oldAtr.BalanceDirections.IsEmpty() {
bf.Directions = utils.StringMapPointer(oldAtr.BalanceDirections)
}
if !oldAtr.BalanceDestinationIds.IsEmpty() {
bf.DestinationIDs = utils.StringMapPointer(oldAtr.BalanceDestinationIds)
}
if !oldAtr.BalanceTimingTags.IsEmpty() {
bf.TimingIDs = utils.StringMapPointer(oldAtr.BalanceTimingTags)
}
if !oldAtr.BalanceCategories.IsEmpty() {
bf.Categories = utils.StringMapPointer(oldAtr.BalanceCategories)
}
if !oldAtr.BalanceSharedGroups.IsEmpty() {
bf.SharedGroups = utils.StringMapPointer(oldAtr.BalanceSharedGroups)
}
if oldAtr.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight)
}
if oldAtr.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled)
}
if !oldAtr.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate)
}
at.Balance = bf
newAcc.ActionTriggers[index] = at
}
newAcc.InitCounters()
newAccounts = append(newAccounts, newAcc)
migratedKeys = append(migratedKeys, key)
}
// write data back
for _, newAcc := range newAccounts {
result, err := mig.ms.Marshal(newAcc)
if err != nil {
return err
}
if err := mig.db.Cmd("SET", utils.ACCOUNT_PREFIX+newAcc.ID, result).Err; err != nil {
return err
}
}
notMigrated := len(keys) - len(migratedKeys)
if notMigrated > 0 {
log.Printf("WARNING: there are %d accounts that failed migration!", notMigrated)
}
return err
}
func (mig MigratorRC8) migrateActionTriggersInt() error {
keys, err := mig.db.Cmd("KEYS", utils.ACTION_TRIGGER_PREFIX+"*").List()
if err != nil {
return err
}
newAtrsMap := make(map[string]engine.ActionTriggers, len(keys))
for _, key := range keys {
log.Printf("Migrating action trigger: %s...", key)
var oldAtrs ActionTriggers1
var values []byte
if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil {
if err := mig.ms.Unmarshal(values, &oldAtrs); err != nil {
return err
}
}
newAtrs := make(engine.ActionTriggers, len(oldAtrs))
for index, oldAtr := range oldAtrs {
at := &engine.ActionTrigger{
ID: oldAtr.ID,
UniqueID: oldAtr.UniqueID,
ThresholdType: oldAtr.ThresholdType,
ThresholdValue: oldAtr.ThresholdValue,
Recurrent: oldAtr.Recurrent,
MinSleep: oldAtr.MinSleep,
Weight: oldAtr.Weight,
ActionsID: oldAtr.ActionsId,
MinQueuedItems: oldAtr.MinQueuedItems,
Executed: oldAtr.Executed,
}
bf := &engine.BalanceFilter{}
if oldAtr.BalanceId != "" {
bf.ID = utils.StringPointer(oldAtr.BalanceId)
}
if oldAtr.BalanceType != "" {
bf.Type = utils.StringPointer(oldAtr.BalanceType)
}
if oldAtr.BalanceRatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAtr.BalanceRatingSubject)
}
if !oldAtr.BalanceDirections.IsEmpty() {
bf.Directions = utils.StringMapPointer(oldAtr.BalanceDirections)
}
if !oldAtr.BalanceDestinationIds.IsEmpty() {
bf.DestinationIDs = utils.StringMapPointer(oldAtr.BalanceDestinationIds)
}
if !oldAtr.BalanceTimingTags.IsEmpty() {
bf.TimingIDs = utils.StringMapPointer(oldAtr.BalanceTimingTags)
}
if !oldAtr.BalanceCategories.IsEmpty() {
bf.Categories = utils.StringMapPointer(oldAtr.BalanceCategories)
}
if !oldAtr.BalanceSharedGroups.IsEmpty() {
bf.SharedGroups = utils.StringMapPointer(oldAtr.BalanceSharedGroups)
}
if oldAtr.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight)
}
if oldAtr.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled)
}
if !oldAtr.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate)
}
at.Balance = bf
newAtrs[index] = at
}
newAtrsMap[key] = newAtrs
}
// write data back
for key, atrs := range newAtrsMap {
result, err := mig.ms.Marshal(&atrs)
if err != nil {
return err
}
if err = mig.db.Cmd("SET", key, result).Err; err != nil {
return err
}
}
return nil
}
func (mig MigratorRC8) migrateActionsInt() error {
keys, err := mig.db.Cmd("KEYS", utils.ACTION_PREFIX+"*").List()
if err != nil {
return err
}
newAcsMap := make(map[string]engine.Actions, len(keys))
for _, key := range keys {
log.Printf("Migrating action: %s...", key)
var oldAcs Actions1
var values []byte
if values, err = mig.db.Cmd("GET", key).Bytes(); err == nil {
if err := mig.ms.Unmarshal(values, &oldAcs); err != nil {
return err
}
}
newAcs := make(engine.Actions, len(oldAcs))
for index, oldAc := range oldAcs {
a := &engine.Action{
Id: oldAc.Id,
ActionType: oldAc.ActionType,
ExtraParameters: oldAc.ExtraParameters,
ExpirationString: oldAc.ExpirationString,
Filter: oldAc.Filter,
Weight: oldAc.Weight,
Balance: &engine.BalanceFilter{},
}
bf := a.Balance
if oldAc.Balance.Uuid != "" {
bf.Uuid = utils.StringPointer(oldAc.Balance.Uuid)
}
if oldAc.Balance.Id != "" {
bf.ID = utils.StringPointer(oldAc.Balance.Id)
}
if oldAc.BalanceType != "" {
bf.Type = utils.StringPointer(oldAc.BalanceType)
}
if oldAc.Balance.Value != 0 {
bf.Value = utils.Float64Pointer(oldAc.Balance.Value)
}
if oldAc.Balance.RatingSubject != "" {
bf.RatingSubject = utils.StringPointer(oldAc.Balance.RatingSubject)
}
if !oldAc.Balance.DestinationIds.IsEmpty() {
bf.DestinationIDs = utils.StringMapPointer(oldAc.Balance.DestinationIds)
}
if !oldAc.Balance.TimingIDs.IsEmpty() {
bf.TimingIDs = utils.StringMapPointer(oldAc.Balance.TimingIDs)
}
if !oldAc.Balance.Categories.IsEmpty() {
bf.Categories = utils.StringMapPointer(oldAc.Balance.Categories)
}
if !oldAc.Balance.SharedGroups.IsEmpty() {
bf.SharedGroups = utils.StringMapPointer(oldAc.Balance.SharedGroups)
}
if oldAc.Balance.Weight != 0 {
bf.Weight = utils.Float64Pointer(oldAc.Balance.Weight)
}
if oldAc.Balance.Disabled != false {
bf.Disabled = utils.BoolPointer(oldAc.Balance.Disabled)
}
if !oldAc.Balance.ExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAc.Balance.ExpirationDate)
}
bf.Timings = oldAc.Balance.Timings
newAcs[index] = a
}
newAcsMap[key] = newAcs
}
// write data back
for key, acs := range newAcsMap {
result, err := mig.ms.Marshal(&acs)
if err != nil {
return err
}
if err = mig.db.Cmd("SET", key, result).Err; err != nil {
return err
}
}
return nil
}