mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
EventReader with CSV in alpha
This commit is contained in:
@@ -31,7 +31,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func newAgentRequest(req config.DataProvider,
|
||||
func NewAgentRequest(req config.DataProvider,
|
||||
vars map[string]interface{},
|
||||
cgrRply *config.NavigableMap,
|
||||
rply *config.NavigableMap,
|
||||
@@ -50,16 +50,16 @@ func newAgentRequest(req config.DataProvider,
|
||||
CGRRequest: config.NewNavigableMap(nil),
|
||||
CGRReply: cgrRply,
|
||||
Reply: rply,
|
||||
timezone: timezone,
|
||||
Timezone: timezone,
|
||||
filterS: filterS,
|
||||
}
|
||||
// populate tenant
|
||||
if tntIf, err := ar.ParseField(
|
||||
&config.FCTemplate{Type: utils.META_COMPOSED,
|
||||
Value: tntTpl}); err == nil && tntIf.(string) != "" {
|
||||
ar.tenant = tntIf.(string)
|
||||
ar.Tenant = tntIf.(string)
|
||||
} else {
|
||||
ar.tenant = dfltTenant
|
||||
ar.Tenant = dfltTenant
|
||||
}
|
||||
|
||||
return
|
||||
@@ -74,8 +74,8 @@ type AgentRequest struct {
|
||||
CGRReply *config.NavigableMap
|
||||
CGRAReq *config.NavigableMap // Used to acces live build in request; both available as active request and active reply
|
||||
Reply *config.NavigableMap
|
||||
tenant,
|
||||
timezone string
|
||||
Tenant,
|
||||
Timezone string
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ func (ar *AgentRequest) AsNavigableMap(tplFlds []*config.FCTemplate) (
|
||||
nM *config.NavigableMap, err error) {
|
||||
ar.CGRAReq = config.NewNavigableMap(nil)
|
||||
for _, tplFld := range tplFlds {
|
||||
if pass, err := ar.filterS.Pass(ar.tenant,
|
||||
if pass, err := ar.filterS.Pass(ar.Tenant,
|
||||
tplFld.Filters, ar); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
@@ -207,11 +207,11 @@ func (aReq *AgentRequest) ParseField(
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
tEnd, err := utils.ParseTimeDetectLayout(strVal1, aReq.timezone)
|
||||
tEnd, err := utils.ParseTimeDetectLayout(strVal1, aReq.Timezone)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
tStart, err := utils.ParseTimeDetectLayout(strVal2, aReq.timezone)
|
||||
tStart, err := utils.ParseTimeDetectLayout(strVal2, aReq.Timezone)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestAgReqAsNavigableMap(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.CGRID},
|
||||
utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
|
||||
@@ -137,7 +137,7 @@ func TestAgReqMaxCost(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false)
|
||||
|
||||
@@ -182,7 +182,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "MandatoryFalse",
|
||||
@@ -232,7 +232,7 @@ func TestAgReqParseFieldRadius(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "MandatoryFalse",
|
||||
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
|
||||
@@ -272,7 +272,7 @@ Host: api.cgrates.org
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "MandatoryFalse",
|
||||
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
|
||||
@@ -343,7 +343,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "MandatoryFalse",
|
||||
FieldId: "MandatoryFalse", Type: utils.META_COMPOSED,
|
||||
@@ -371,7 +371,7 @@ func TestAgReqEmptyFilter(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.CGRID},
|
||||
utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()),
|
||||
@@ -414,7 +414,7 @@ func TestAgReqMetaExponent(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq.CGRRequest.Set([]string{"Value"}, "2", false, false)
|
||||
agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false)
|
||||
|
||||
@@ -440,7 +440,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
@@ -483,7 +483,7 @@ func TestAgReqFieldAsNone(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
|
||||
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
|
||||
@@ -520,7 +520,7 @@ func TestAgReqFieldAsNone2(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
|
||||
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
|
||||
@@ -560,7 +560,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false)
|
||||
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
|
||||
@@ -617,7 +617,7 @@ func TestAgReqFieldAsInterface(t *testing.T) {
|
||||
dm := engine.NewDataManager(data)
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
// populate request, emulating the way will be done in HTTPAgent
|
||||
agReq.CGRAReq = config.NewNavigableMap(nil)
|
||||
agReq.CGRAReq.Set([]string{utils.Usage}, []*config.NMItem{{Data: 3 * time.Minute}}, false, false)
|
||||
@@ -683,7 +683,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) {
|
||||
}
|
||||
cgrRply := config.NewNavigableMap(ev2)
|
||||
|
||||
agReq := newAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "Fld1",
|
||||
@@ -724,7 +724,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) {
|
||||
}
|
||||
rply := config.NewNavigableMap(ev)
|
||||
|
||||
agReq := newAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
|
||||
|
||||
agReq.setCGRReply(nil, utils.ErrNotFound)
|
||||
|
||||
@@ -773,7 +773,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) {
|
||||
utils.Error: "",
|
||||
}
|
||||
|
||||
agReq := newAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS)
|
||||
|
||||
agReq.setCGRReply(myEv, nil)
|
||||
|
||||
@@ -818,7 +818,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "CCUsage", Filters: []string{},
|
||||
@@ -896,7 +896,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "Usage", Filters: []string{},
|
||||
@@ -962,7 +962,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "Sum", Filters: []string{},
|
||||
@@ -1006,7 +1006,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "Diff", Filters: []string{},
|
||||
@@ -1050,7 +1050,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) {
|
||||
cfg, _ := config.NewDefaultCGRConfig()
|
||||
filterS := engine.NewFilterS(cfg, nil, nil, nil, dm)
|
||||
//pass the data provider to agent request
|
||||
agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS)
|
||||
|
||||
tplFlds := []*config.FCTemplate{
|
||||
&config.FCTemplate{Tag: "ValExp", Filters: []string{},
|
||||
|
||||
@@ -208,7 +208,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
|
||||
var lclProcessed bool
|
||||
lclProcessed, err = da.processRequest(
|
||||
reqProcessor,
|
||||
newAgentRequest(
|
||||
NewAgentRequest(
|
||||
diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(reqProcessor.Timezone,
|
||||
@@ -248,14 +248,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
|
||||
|
||||
func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor,
|
||||
agReq *AgentRequest) (processed bool, err error) {
|
||||
if pass, err := da.filterS.Pass(agReq.tenant,
|
||||
if pass, err := da.filterS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
}
|
||||
if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil {
|
||||
return
|
||||
}
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep)
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
@@ -442,7 +442,7 @@ func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
|
||||
return utils.ErrMandatoryIeMissing
|
||||
}
|
||||
dmd := msg.(*diamMsgData)
|
||||
aReq := newAgentRequest(
|
||||
aReq := NewAgentRequest(
|
||||
newDADataProvider(dmd.c, dmd.m),
|
||||
dmd.vars,
|
||||
config.NewNavigableMap(nil),
|
||||
|
||||
@@ -132,7 +132,7 @@ func TestProcessRequest(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
reqProcessor.Flags, _ = utils.FlagsWithParamsFromSlice([]string{utils.MetaAuth, utils.MetaAccounts})
|
||||
agReq := newAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
agReq := NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, filters)
|
||||
da := &DiameterAgent{
|
||||
@@ -220,7 +220,7 @@ func TestProcessRequest(t *testing.T) {
|
||||
cgrRplyNM = config.NewNavigableMap(nil)
|
||||
rply = config.NewNavigableMap(nil)
|
||||
|
||||
agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, filters)
|
||||
da = &DiameterAgent{
|
||||
@@ -308,7 +308,7 @@ func TestProcessRequest(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, filters)
|
||||
da = &DiameterAgent{
|
||||
@@ -410,7 +410,7 @@ func TestProcessRequest(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, filters)
|
||||
da = &DiameterAgent{
|
||||
@@ -498,7 +498,7 @@ func TestProcessRequest(t *testing.T) {
|
||||
},
|
||||
}}
|
||||
|
||||
agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply,
|
||||
reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone, filters)
|
||||
da = &DiameterAgent{
|
||||
|
||||
@@ -101,7 +101,7 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
|
||||
var lclProcessed bool
|
||||
lclProcessed, err = da.processRequest(
|
||||
reqProcessor,
|
||||
newAgentRequest(
|
||||
NewAgentRequest(
|
||||
dnsDP, reqVars, cgrRplyNM, rplyNM,
|
||||
reqProcessor.Tenant,
|
||||
da.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
@@ -150,14 +150,14 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
|
||||
|
||||
func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
|
||||
agReq *AgentRequest) (processed bool, err error) {
|
||||
if pass, err := da.fltrS.Pass(agReq.tenant,
|
||||
if pass, err := da.fltrS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
}
|
||||
if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil {
|
||||
return
|
||||
}
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep)
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
|
||||
@@ -61,7 +61,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
cgrRplyNM := config.NewNavigableMap(nil)
|
||||
rplyNM := config.NewNavigableMap(nil)
|
||||
for _, reqProcessor := range ha.reqProcessors {
|
||||
agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
reqProcessor.Tenant, ha.dfltTenant,
|
||||
utils.FirstNonEmpty(reqProcessor.Timezone,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone),
|
||||
@@ -98,14 +98,14 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// processRequest represents one processor processing the request
|
||||
func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor,
|
||||
agReq *AgentRequest) (processed bool, err error) {
|
||||
if pass, err := ha.filterS.Pass(agReq.tenant,
|
||||
if pass, err := ha.filterS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
}
|
||||
if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil {
|
||||
return
|
||||
}
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep)
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
|
||||
@@ -477,7 +477,7 @@ func diamErr(m *diam.Message, resCode uint32,
|
||||
reqVars map[string]interface{},
|
||||
tpl []*config.FCTemplate, tnt, tmz string,
|
||||
filterS *engine.FilterS) (a *diam.Message, err error) {
|
||||
aReq := newAgentRequest(
|
||||
aReq := NewAgentRequest(
|
||||
newDADataProvider(nil, m), reqVars,
|
||||
config.NewNavigableMap(nil),
|
||||
config.NewNavigableMap(nil),
|
||||
|
||||
@@ -96,7 +96,7 @@ func TestRadComposedFieldValue(t *testing.T) {
|
||||
if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
|
||||
@@ -116,7 +116,7 @@ func TestRadFieldOutVal(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart)
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
|
||||
@@ -137,7 +137,7 @@ func TestRadReplyAppendAttributes(t *testing.T) {
|
||||
&config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED,
|
||||
Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true, utils.INFIELD_SEP)},
|
||||
}
|
||||
agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil)
|
||||
agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false, false)
|
||||
agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false, false)
|
||||
agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false, false)
|
||||
|
||||
@@ -79,7 +79,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
|
||||
rplyNM := config.NewNavigableMap(nil)
|
||||
var processed bool
|
||||
for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors {
|
||||
agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(reqProcessor.Timezone,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone),
|
||||
@@ -116,7 +116,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e
|
||||
rplyNM := config.NewNavigableMap(nil)
|
||||
var processed bool
|
||||
for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors {
|
||||
agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM,
|
||||
reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(reqProcessor.Timezone,
|
||||
config.CgrConfig().GeneralCfg().DefaultTimezone),
|
||||
@@ -144,14 +144,14 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e
|
||||
// processRequest represents one processor processing the request
|
||||
func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor,
|
||||
agReq *AgentRequest, rply *radigo.Packet) (processed bool, err error) {
|
||||
if pass, err := ra.filterS.Pass(agReq.tenant,
|
||||
if pass, err := ra.filterS.Pass(agReq.Tenant,
|
||||
reqProcessor.Filters, agReq); err != nil || !pass {
|
||||
return pass, err
|
||||
}
|
||||
if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil {
|
||||
return
|
||||
}
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
|
||||
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep)
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
|
||||
@@ -191,7 +191,7 @@ func TestCgrCfgCDRC(t *testing.T) {
|
||||
{
|
||||
"id": "*default",
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"Content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"field_id": "ToR", "type": "*composed", "value": "~7:s/^(voice|data|sms|mms|generic)$/*$1/"},
|
||||
{"field_id": "AnswerTime", "type": "*composed", "value": "~1"},
|
||||
{"field_id": "Usage", "type": "*composed", "value": "~9:s/^(\\d+)$/${1}s/"},
|
||||
@@ -1826,8 +1826,8 @@ func TestCgrCdfEventReader(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: []string{},
|
||||
Flags: utils.FlagsWithParams{},
|
||||
Header_fields: make([]*FCTemplate, 0),
|
||||
Content_fields: []*FCTemplate{
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -1851,7 +1851,7 @@ func TestCgrCdfEventReader(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
Trailer_fields: make([]*FCTemplate, 0),
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -1875,8 +1875,8 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: nil,
|
||||
Flags: utils.FlagsWithParams{},
|
||||
Header_fields: make([]*FCTemplate, 0),
|
||||
Content_fields: []*FCTemplate{
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -1900,7 +1900,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
Trailer_fields: make([]*FCTemplate, 0),
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.dfltEvRdr, eCfg) {
|
||||
t.Errorf("received: %+v,\n expecting: %+v", utils.ToJSON(cgrCfg.dfltEvRdr), utils.ToJSON(eCfg))
|
||||
|
||||
@@ -91,9 +91,9 @@ type EventReaderCfg struct {
|
||||
Timezone string
|
||||
Filters []string
|
||||
Flags utils.FlagsWithParams
|
||||
Header_fields []*FCTemplate
|
||||
Content_fields []*FCTemplate
|
||||
Trailer_fields []*FCTemplate
|
||||
HeaderFields []*FCTemplate
|
||||
ContentFields []*FCTemplate
|
||||
TrailerFields []*FCTemplate
|
||||
Continue bool
|
||||
}
|
||||
|
||||
@@ -148,17 +148,17 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string
|
||||
}
|
||||
}
|
||||
if jsnCfg.Header_fields != nil {
|
||||
if er.Header_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil {
|
||||
if er.HeaderFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Content_fields != nil {
|
||||
if er.Content_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Content_fields, sep); err != nil {
|
||||
if er.ContentFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Content_fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Trailer_fields != nil {
|
||||
if er.Trailer_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Trailer_fields, sep); err != nil {
|
||||
if er.TrailerFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Trailer_fields, sep); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -195,17 +195,17 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) {
|
||||
}
|
||||
}
|
||||
cln.Flags = er.Flags
|
||||
cln.Header_fields = make([]*FCTemplate, len(er.Header_fields))
|
||||
for idx, fld := range er.Header_fields {
|
||||
cln.Header_fields[idx] = fld.Clone()
|
||||
cln.HeaderFields = make([]*FCTemplate, len(er.HeaderFields))
|
||||
for idx, fld := range er.HeaderFields {
|
||||
cln.HeaderFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.Content_fields = make([]*FCTemplate, len(er.Content_fields))
|
||||
for idx, fld := range er.Content_fields {
|
||||
cln.Content_fields[idx] = fld.Clone()
|
||||
cln.ContentFields = make([]*FCTemplate, len(er.ContentFields))
|
||||
for idx, fld := range er.ContentFields {
|
||||
cln.ContentFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.Trailer_fields = make([]*FCTemplate, len(er.Trailer_fields))
|
||||
for idx, fld := range er.Trailer_fields {
|
||||
cln.Trailer_fields[idx] = fld.Clone()
|
||||
cln.TrailerFields = make([]*FCTemplate, len(er.TrailerFields))
|
||||
for idx, fld := range er.TrailerFields {
|
||||
cln.TrailerFields[idx] = fld.Clone()
|
||||
}
|
||||
cln.Continue = er.Continue
|
||||
return
|
||||
|
||||
@@ -27,14 +27,14 @@ import (
|
||||
|
||||
func TestEventRedearClone(t *testing.T) {
|
||||
orig := &EventReaderCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
SourceID: "RandomSource",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
Header_fields: []*FCTemplate{},
|
||||
Content_fields: []*FCTemplate{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
SourceID: "RandomSource",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
HeaderFields: []*FCTemplate{},
|
||||
ContentFields: []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -50,22 +50,22 @@ func TestEventRedearClone(t *testing.T) {
|
||||
Mandatory: true,
|
||||
},
|
||||
},
|
||||
Trailer_fields: []*FCTemplate{},
|
||||
Continue: true,
|
||||
TrailerFields: []*FCTemplate{},
|
||||
Continue: true,
|
||||
}
|
||||
cloned := orig.Clone()
|
||||
if !reflect.DeepEqual(cloned, orig) {
|
||||
t.Errorf("expected: %s \n,received: %s", utils.ToJSON(orig), utils.ToJSON(cloned))
|
||||
}
|
||||
initialOrig := &EventReaderCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
SourceID: "RandomSource",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
Header_fields: []*FCTemplate{},
|
||||
Content_fields: []*FCTemplate{
|
||||
ID: utils.MetaDefault,
|
||||
Type: "RandomType",
|
||||
FieldSep: ",",
|
||||
SourceID: "RandomSource",
|
||||
Filters: []string{"Filter1", "Filter2"},
|
||||
Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP),
|
||||
HeaderFields: []*FCTemplate{},
|
||||
ContentFields: []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -81,11 +81,11 @@ func TestEventRedearClone(t *testing.T) {
|
||||
Mandatory: true,
|
||||
},
|
||||
},
|
||||
Trailer_fields: []*FCTemplate{},
|
||||
Continue: true,
|
||||
TrailerFields: []*FCTemplate{},
|
||||
Continue: true,
|
||||
}
|
||||
orig.Filters = []string{"SingleFilter"}
|
||||
orig.Content_fields = []*FCTemplate{
|
||||
orig.ContentFields = []*FCTemplate{
|
||||
{
|
||||
Tag: "TOR",
|
||||
FieldId: "ToR",
|
||||
@@ -122,8 +122,8 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: nil,
|
||||
Flags: utils.FlagsWithParams{},
|
||||
Header_fields: make([]*FCTemplate, 0),
|
||||
Content_fields: []*FCTemplate{
|
||||
HeaderFields: make([]*FCTemplate, 0),
|
||||
ContentFields: []*FCTemplate{
|
||||
{Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED,
|
||||
@@ -147,7 +147,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) {
|
||||
{Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED,
|
||||
Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
Trailer_fields: make([]*FCTemplate, 0),
|
||||
TrailerFields: make([]*FCTemplate, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
36
ers/ers.go
36
ers/ers.go
@@ -66,8 +66,8 @@ type ERService struct {
|
||||
|
||||
// ListenAndServe keeps the service alive
|
||||
func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
|
||||
for _, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
if err = erS.addReader(rdrCfg); err != nil {
|
||||
for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
if err = erS.addReader(rdrCfg.ID, cfgIdx); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> adding reader <%s> got error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
@@ -92,14 +92,16 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
|
||||
}
|
||||
|
||||
// addReader will add a new reader to the service
|
||||
func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) {
|
||||
erS.stopLsn[rdrCfg.ID] = make(chan struct{})
|
||||
func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) {
|
||||
erS.stopLsn[rdrID] = make(chan struct{})
|
||||
var rdr EventReader
|
||||
if rdr, err = NewEventReader(rdrCfg, erS.stopLsn[rdrCfg.ID], erS.exitChan); err != nil {
|
||||
if rdr, err = NewEventReader(erS.cfg, cfgIdx,
|
||||
erS.rdrEvents, erS.filterS,
|
||||
erS.stopLsn[rdrID], erS.exitChan); err != nil {
|
||||
return
|
||||
}
|
||||
erS.rdrs[rdrCfg.ID] = rdr
|
||||
return rdr.Init()
|
||||
erS.rdrs[rdrID] = rdr
|
||||
return rdr.Serve()
|
||||
}
|
||||
|
||||
// handleReloads will handle the config reloads which are signaled over cfgRldChan
|
||||
@@ -109,17 +111,19 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
case <-erS.exitChan:
|
||||
return
|
||||
case <-cfgRldChan:
|
||||
cfgIDs := make(map[string]*config.EventReaderCfg)
|
||||
cfgIDs := make(map[string]int)
|
||||
pathReloaded := make(map[string]struct{})
|
||||
// index config IDs
|
||||
for _, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
cfgIDs[rdrCfg.ID] = rdrCfg
|
||||
for i, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
cfgIDs[rdrCfg.ID] = i
|
||||
}
|
||||
erS.Lock()
|
||||
// remove the necessary ids
|
||||
for id := range erS.rdrs {
|
||||
if newCfg, has := cfgIDs[id]; has { // still present
|
||||
if newCfg.SourcePath == erS.rdrPaths[id] {
|
||||
for id, rdr := range erS.rdrs {
|
||||
if cfgIdx, has := cfgIDs[id]; has { // still present
|
||||
newCfg := erS.cfg.ERsCfg().Readers[cfgIdx]
|
||||
if newCfg.SourcePath == erS.rdrPaths[id] &&
|
||||
newCfg.ID == rdr.Config().ID { // make sure the index did not change
|
||||
continue
|
||||
}
|
||||
pathReloaded[id] = struct{}{}
|
||||
@@ -129,16 +133,16 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
delete(erS.stopLsn, id)
|
||||
}
|
||||
// add new ids
|
||||
for id, rdrCfg := range cfgIDs {
|
||||
for id, rdrIdx := range cfgIDs {
|
||||
if _, has := erS.rdrs[id]; has {
|
||||
if _, has := pathReloaded[id]; !has {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := erS.addReader(rdrCfg); err != nil {
|
||||
if err := erS.addReader(id, rdrIdx); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> adding reader <%s> got error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
utils.ERs, id, err.Error()))
|
||||
erS.exitChan <- true
|
||||
}
|
||||
}
|
||||
|
||||
162
ers/filecsv.go
162
ers/filecsv.go
@@ -19,42 +19,62 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewCSVFileER(cfg *config.EventReaderCfg,
|
||||
func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents chan *erEvent, fltrS *engine.FilterS,
|
||||
rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
srcPath := cfg.SourcePath
|
||||
srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath
|
||||
if strings.HasSuffix(srcPath, utils.Slash) {
|
||||
srcPath = srcPath[:len(srcPath)-1]
|
||||
}
|
||||
return &CSVFileER{erCfg: cfg, rdrDir: srcPath,
|
||||
rdrExit: rdrExit, appExit: appExit}, nil
|
||||
return &CSVFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
rdrDir: srcPath,
|
||||
rdrEvents: rdrEvents,
|
||||
rdrExit: rdrExit,
|
||||
appExit: appExit}, nil
|
||||
}
|
||||
|
||||
// CSVFileER implements EventReader interface for .csv files
|
||||
type CSVFileER struct {
|
||||
sync.RWMutex
|
||||
erCfg *config.EventReaderCfg
|
||||
rdrDir string
|
||||
rdrExit chan struct{}
|
||||
appExit chan bool
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
rdrDir string
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
rdrExit chan struct{}
|
||||
appExit chan bool
|
||||
conReqs chan struct{} // limit number of opened files
|
||||
}
|
||||
|
||||
func (rdr *CSVFileER) Config() *config.EventReaderCfg {
|
||||
return rdr.erCfg
|
||||
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
|
||||
}
|
||||
|
||||
func (rdr *CSVFileER) Init() (err error) {
|
||||
switch rdr.erCfg.RunDelay {
|
||||
case time.Duration(0): // 0 disables the automatic read, maybe per API
|
||||
func (rdr *CSVFileER) Serve() (err error) {
|
||||
switch rdr.Config().RunDelay {
|
||||
case time.Duration(0): // 0 disables the automatic read, maybe done per API
|
||||
return
|
||||
case time.Duration(-1):
|
||||
return watchDir(rdr.rdrDir, rdr.processFile,
|
||||
@@ -80,16 +100,126 @@ func (rdr *CSVFileER) Init() (err error) {
|
||||
}
|
||||
}()
|
||||
}
|
||||
time.Sleep(rdr.erCfg.RunDelay)
|
||||
time.Sleep(rdr.Config().RunDelay)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rdr *CSVFileER) Read() (ev *utils.CGREvent, err error) {
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
}
|
||||
absPath := path.Join(fPath, fName)
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
|
||||
var file *os.File
|
||||
if file, err = os.Open(absPath); err != nil {
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = rune(rdr.Config().FieldSep[0])
|
||||
csvReader.Comment = '#'
|
||||
rowNr := 0 // This counts the rows in the file, not really number of CDRs
|
||||
evsPosted := 0
|
||||
timeStart := time.Now()
|
||||
reqVars := make(map[string]interface{})
|
||||
for {
|
||||
rowNr++
|
||||
var record []string
|
||||
if record, err = csvReader.Read(); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
agReq := agents.NewAgentRequest(
|
||||
&csvProvider{req: record}, reqVars,
|
||||
nil, nil, rdr.Config().Tenant,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(rdr.Config().Timezone,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
continue
|
||||
}
|
||||
navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, rowNr, err.Error()))
|
||||
continue
|
||||
}
|
||||
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(
|
||||
agReq.Tenant, utils.NestingSep),
|
||||
rdrCfg: rdr.Config()}
|
||||
evsPosted++
|
||||
}
|
||||
if rdr.Config().ProcessedPath != "" {
|
||||
// Finished with file, move it to processed folder
|
||||
outPath := path.Join(rdr.Config().ProcessedPath, fName)
|
||||
if err = os.Rename(absPath, outPath); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s",
|
||||
utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart)))
|
||||
return
|
||||
}
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *CSVFileER) processFile(itmPath, itmID string) (err error) {
|
||||
// csvProvider implements engine.DataProvider so we can pass it to filters
|
||||
type csvProvider struct {
|
||||
req []string
|
||||
cache *config.NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (cP *csvProvider) String() string {
|
||||
return utils.ToJSON(cP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = cP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
if cfgFieldIdx, err := strconv.Atoi(fldPath[0]); err != nil || len(cP.req) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err)
|
||||
} else {
|
||||
data = cP.req[cfgFieldIdx]
|
||||
}
|
||||
cP.cache.Set(fldPath, data, false, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = cP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(valIface), nil
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) AsNavigableMap([]*config.FCTemplate) (
|
||||
nm *config.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// RemoteHost is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) RemoteHost() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
|
||||
@@ -22,23 +22,23 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type EventReader interface {
|
||||
Config() *config.EventReaderCfg // return it's configuration
|
||||
Init() error // subscribe the reader on the path
|
||||
Read() (*utils.CGREvent, error) // produce a single record in the events file
|
||||
Serve() error // subscribe the reader on the path
|
||||
}
|
||||
|
||||
// NewEventReader instantiates the event reader based on configuration at index
|
||||
func NewEventReader(rdrCfg *config.EventReaderCfg,
|
||||
rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
switch rdrCfg.Type {
|
||||
func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
switch cfg.ERsCfg().Readers[cfgIdx].Type {
|
||||
default:
|
||||
err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type)
|
||||
err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type)
|
||||
case utils.MetaFileCSV:
|
||||
return NewCSVFileER(rdrCfg, rdrExit, appExit)
|
||||
return NewCSVFileER(cfg, cfgIdx, rdrEvents, fltrS, rdrExit, appExit)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user