From e3055c8d9f93256f61cb6330a0d369531d083e48 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 4 Sep 2019 12:24:17 +0200 Subject: [PATCH] EventReader with CSV in alpha --- agents/agentreq.go | 18 ++--- agents/agentreq_test.go | 42 +++++----- agents/diamagent.go | 8 +- agents/diamagent_test.go | 10 +-- agents/dnsagent.go | 6 +- agents/httpagent.go | 6 +- agents/libdiam.go | 2 +- agents/librad_test.go | 6 +- agents/radagent.go | 8 +- config/config_test.go | 14 ++-- config/erscfg.go | 30 ++++---- config/erscfg_test.go | 48 ++++++------ ers/ers.go | 36 +++++---- ers/filecsv.go | 162 +++++++++++++++++++++++++++++++++++---- ers/reader.go | 14 ++-- 15 files changed, 272 insertions(+), 138 deletions(-) diff --git a/agents/agentreq.go b/agents/agentreq.go index 31b7a6c39..b9179c99f 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -31,7 +31,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func newAgentRequest(req config.DataProvider, +func NewAgentRequest(req config.DataProvider, vars map[string]interface{}, cgrRply *config.NavigableMap, rply *config.NavigableMap, @@ -50,16 +50,16 @@ func newAgentRequest(req config.DataProvider, CGRRequest: config.NewNavigableMap(nil), CGRReply: cgrRply, Reply: rply, - timezone: timezone, + Timezone: timezone, filterS: filterS, } // populate tenant if tntIf, err := ar.ParseField( &config.FCTemplate{Type: utils.META_COMPOSED, Value: tntTpl}); err == nil && tntIf.(string) != "" { - ar.tenant = tntIf.(string) + ar.Tenant = tntIf.(string) } else { - ar.tenant = dfltTenant + ar.Tenant = dfltTenant } return @@ -74,8 +74,8 @@ type AgentRequest struct { CGRReply *config.NavigableMap CGRAReq *config.NavigableMap // Used to acces live build in request; both available as active request and active reply Reply *config.NavigableMap - tenant, - timezone string + Tenant, + Timezone string filterS *engine.FilterS } @@ -127,7 +127,7 @@ func (ar *AgentRequest) AsNavigableMap(tplFlds []*config.FCTemplate) ( nM *config.NavigableMap, err error) { ar.CGRAReq = config.NewNavigableMap(nil) for _, tplFld := range tplFlds { - if pass, err := ar.filterS.Pass(ar.tenant, + if pass, err := ar.filterS.Pass(ar.Tenant, tplFld.Filters, ar); err != nil { return nil, err } else if !pass { @@ -207,11 +207,11 @@ func (aReq *AgentRequest) ParseField( if err != nil { return "", err } - tEnd, err := utils.ParseTimeDetectLayout(strVal1, aReq.timezone) + tEnd, err := utils.ParseTimeDetectLayout(strVal1, aReq.Timezone) if err != nil { return "", err } - tStart, err := utils.ParseTimeDetectLayout(strVal2, aReq.timezone) + tStart, err := utils.ParseTimeDetectLayout(strVal2, aReq.Timezone) if err != nil { return "", err } diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index a3535d002..208767aee 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -41,7 +41,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), @@ -137,7 +137,7 @@ func TestAgReqMaxCost(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false) @@ -182,7 +182,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", @@ -232,7 +232,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -272,7 +272,7 @@ Host: api.cgrates.org cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -343,7 +343,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "MandatoryFalse", FieldId: "MandatoryFalse", Type: utils.META_COMPOSED, @@ -371,7 +371,7 @@ func TestAgReqEmptyFilter(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), @@ -414,7 +414,7 @@ func TestAgReqMetaExponent(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) agReq.CGRRequest.Set([]string{"Value"}, "2", false, false) agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false) @@ -440,7 +440,7 @@ func TestAgReqCGRActiveRequest(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent tplFlds := []*config.FCTemplate{ @@ -483,7 +483,7 @@ func TestAgReqFieldAsNone(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -520,7 +520,7 @@ func TestAgReqFieldAsNone2(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -560,7 +560,7 @@ func TestAgReqAsNavigableMap2(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.ToR}, utils.VOICE, false, false) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -617,7 +617,7 @@ func TestAgReqFieldAsInterface(t *testing.T) { dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRAReq = config.NewNavigableMap(nil) agReq.CGRAReq.Set([]string{utils.Usage}, []*config.NMItem{{Data: 3 * time.Minute}}, false, false) @@ -683,7 +683,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { } cgrRply := config.NewNavigableMap(ev2) - agReq := newAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Fld1", @@ -724,7 +724,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) { } rply := config.NewNavigableMap(ev) - agReq := newAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) agReq.setCGRReply(nil, utils.ErrNotFound) @@ -773,7 +773,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) { utils.Error: "", } - agReq := newAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(nil, nil, nil, rply, nil, "cgrates.org", "", filterS) agReq.setCGRReply(myEv, nil) @@ -818,7 +818,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "CCUsage", Filters: []string{}, @@ -896,7 +896,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Usage", Filters: []string{}, @@ -962,7 +962,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Sum", Filters: []string{}, @@ -1006,7 +1006,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "Diff", Filters: []string{}, @@ -1050,7 +1050,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) { cfg, _ := config.NewDefaultCGRConfig() filterS := engine.NewFilterS(cfg, nil, nil, nil, dm) //pass the data provider to agent request - agReq := newAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ &config.FCTemplate{Tag: "ValExp", Filters: []string{}, diff --git a/agents/diamagent.go b/agents/diamagent.go index 640692941..283a7ae11 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -208,7 +208,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { var lclProcessed bool lclProcessed, err = da.processRequest( reqProcessor, - newAgentRequest( + NewAgentRequest( diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, @@ -248,14 +248,14 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { func (da *DiameterAgent) processRequest(reqProcessor *config.RequestProcessor, agReq *AgentRequest) (processed bool, err error) { - if pass, err := da.filterS.Pass(agReq.tenant, + if pass, err := da.filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err } if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil { return } - cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) + cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep) var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, @@ -442,7 +442,7 @@ func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r return utils.ErrMandatoryIeMissing } dmd := msg.(*diamMsgData) - aReq := newAgentRequest( + aReq := NewAgentRequest( newDADataProvider(dmd.c, dmd.m), dmd.vars, config.NewNavigableMap(nil), diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index 6a4e8f21a..77037de99 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -132,7 +132,7 @@ func TestProcessRequest(t *testing.T) { }, }} reqProcessor.Flags, _ = utils.FlagsWithParamsFromSlice([]string{utils.MetaAuth, utils.MetaAccounts}) - agReq := newAgentRequest(diamDP, reqVars, cgrRplyNM, rply, + agReq := NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters) da := &DiameterAgent{ @@ -220,7 +220,7 @@ func TestProcessRequest(t *testing.T) { cgrRplyNM = config.NewNavigableMap(nil) rply = config.NewNavigableMap(nil) - agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply, + agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters) da = &DiameterAgent{ @@ -308,7 +308,7 @@ func TestProcessRequest(t *testing.T) { }, }} - agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply, + agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters) da = &DiameterAgent{ @@ -410,7 +410,7 @@ func TestProcessRequest(t *testing.T) { }, }} - agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply, + agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters) da = &DiameterAgent{ @@ -498,7 +498,7 @@ func TestProcessRequest(t *testing.T) { }, }} - agReq = newAgentRequest(diamDP, reqVars, cgrRplyNM, rply, + agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, config.CgrConfig().GeneralCfg().DefaultTimezone, filters) da = &DiameterAgent{ diff --git a/agents/dnsagent.go b/agents/dnsagent.go index ee8a2b4f5..fe3bbe4f0 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -101,7 +101,7 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { var lclProcessed bool lclProcessed, err = da.processRequest( reqProcessor, - newAgentRequest( + NewAgentRequest( dnsDP, reqVars, cgrRplyNM, rplyNM, reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant, @@ -150,14 +150,14 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor, agReq *AgentRequest) (processed bool, err error) { - if pass, err := da.fltrS.Pass(agReq.tenant, + if pass, err := da.fltrS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err } if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil { return } - cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) + cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep) var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/httpagent.go b/agents/httpagent.go index 0cdd6fb0a..f96830a6b 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -61,7 +61,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { cgrRplyNM := config.NewNavigableMap(nil) rplyNM := config.NewNavigableMap(nil) for _, reqProcessor := range ha.reqProcessors { - agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, + agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, reqProcessor.Tenant, ha.dfltTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), @@ -98,14 +98,14 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { // processRequest represents one processor processing the request func (ha *HTTPAgent) processRequest(reqProcessor *config.RequestProcessor, agReq *AgentRequest) (processed bool, err error) { - if pass, err := ha.filterS.Pass(agReq.tenant, + if pass, err := ha.filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err } if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil { return } - cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) + cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep) var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/agents/libdiam.go b/agents/libdiam.go index a71a1beab..15d193d9e 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -477,7 +477,7 @@ func diamErr(m *diam.Message, resCode uint32, reqVars map[string]interface{}, tpl []*config.FCTemplate, tnt, tmz string, filterS *engine.FilterS) (a *diam.Message, err error) { - aReq := newAgentRequest( + aReq := NewAgentRequest( newDADataProvider(nil, m), reqVars, config.NewNavigableMap(nil), config.NewNavigableMap(nil), diff --git a/agents/librad_test.go b/agents/librad_test.go index 048915843..61f40b2bc 100644 --- a/agents/librad_test.go +++ b/agents/librad_test.go @@ -96,7 +96,7 @@ func TestRadComposedFieldValue(t *testing.T) { if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil { t.Error(err) } - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false) @@ -116,7 +116,7 @@ func TestRadFieldOutVal(t *testing.T) { t.Error(err) } eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart) - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false) agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false) agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false) @@ -137,7 +137,7 @@ func TestRadReplyAppendAttributes(t *testing.T) { &config.FCTemplate{Tag: "Acct-Session-Time", FieldId: "Acct-Session-Time", Type: utils.META_COMPOSED, Value: config.NewRSRParsersMustCompile("~*cgrep.MaxUsage{*duration_seconds}", true, utils.INFIELD_SEP)}, } - agReq := newAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil) agReq.CGRReply.Set([]string{utils.CapMaxUsage}, time.Duration(time.Hour), false, false) agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, "AccessAccept", false, false) agReq.CGRReply.Set([]string{utils.CapAttributes, utils.Account}, "1001", false, false) diff --git a/agents/radagent.go b/agents/radagent.go index f8b15593a..dcc6a41a9 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -79,7 +79,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e rplyNM := config.NewNavigableMap(nil) var processed bool for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors { - agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, + agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), @@ -116,7 +116,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e rplyNM := config.NewNavigableMap(nil) var processed bool for _, reqProcessor := range ra.cgrCfg.RadiusAgentCfg().RequestProcessors { - agReq := newAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, + agReq := NewAgentRequest(dcdr, nil, cgrRplyNM, rplyNM, reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), @@ -144,14 +144,14 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e // processRequest represents one processor processing the request func (ra *RadiusAgent) processRequest(reqProcessor *config.RequestProcessor, agReq *AgentRequest, rply *radigo.Packet) (processed bool, err error) { - if pass, err := ra.filterS.Pass(agReq.tenant, + if pass, err := ra.filterS.Pass(agReq.Tenant, reqProcessor.Filters, agReq); err != nil || !pass { return pass, err } if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil { return } - cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) + cgrEv := agReq.CGRRequest.AsCGREvent(agReq.Tenant, utils.NestingSep) var reqType string for _, typ := range []string{ utils.MetaDryRun, utils.MetaAuth, diff --git a/config/config_test.go b/config/config_test.go index cfccd68fb..b6bf9b4dc 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -191,7 +191,7 @@ func TestCgrCfgCDRC(t *testing.T) { { "id": "*default", "enabled": true, // enable CDR client functionality - "content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + "Content_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"field_id": "ToR", "type": "*composed", "value": "~7:s/^(voice|data|sms|mms|generic)$/*$1/"}, {"field_id": "AnswerTime", "type": "*composed", "value": "~1"}, {"field_id": "Usage", "type": "*composed", "value": "~9:s/^(\\d+)$/${1}s/"}, @@ -1826,8 +1826,8 @@ func TestCgrCdfEventReader(t *testing.T) { Timezone: utils.EmptyString, Filters: []string{}, Flags: utils.FlagsWithParams{}, - Header_fields: make([]*FCTemplate, 0), - Content_fields: []*FCTemplate{ + HeaderFields: make([]*FCTemplate, 0), + ContentFields: []*FCTemplate{ {Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true}, {Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED, @@ -1851,7 +1851,7 @@ func TestCgrCdfEventReader(t *testing.T) { {Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true}, }, - Trailer_fields: make([]*FCTemplate, 0), + TrailerFields: make([]*FCTemplate, 0), }, }, } @@ -1875,8 +1875,8 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { Timezone: utils.EmptyString, Filters: nil, Flags: utils.FlagsWithParams{}, - Header_fields: make([]*FCTemplate, 0), - Content_fields: []*FCTemplate{ + HeaderFields: make([]*FCTemplate, 0), + ContentFields: []*FCTemplate{ {Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true}, {Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED, @@ -1900,7 +1900,7 @@ func TestCgrCfgEventReaderDefault(t *testing.T) { {Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true}, }, - Trailer_fields: make([]*FCTemplate, 0), + TrailerFields: make([]*FCTemplate, 0), } if !reflect.DeepEqual(cgrCfg.dfltEvRdr, eCfg) { t.Errorf("received: %+v,\n expecting: %+v", utils.ToJSON(cgrCfg.dfltEvRdr), utils.ToJSON(eCfg)) diff --git a/config/erscfg.go b/config/erscfg.go index aeef9196a..3fe1fb37a 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -91,9 +91,9 @@ type EventReaderCfg struct { Timezone string Filters []string Flags utils.FlagsWithParams - Header_fields []*FCTemplate - Content_fields []*FCTemplate - Trailer_fields []*FCTemplate + HeaderFields []*FCTemplate + ContentFields []*FCTemplate + TrailerFields []*FCTemplate Continue bool } @@ -148,17 +148,17 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string } } if jsnCfg.Header_fields != nil { - if er.Header_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil { + if er.HeaderFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil { return err } } if jsnCfg.Content_fields != nil { - if er.Content_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Content_fields, sep); err != nil { + if er.ContentFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Content_fields, sep); err != nil { return err } } if jsnCfg.Trailer_fields != nil { - if er.Trailer_fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Trailer_fields, sep); err != nil { + if er.TrailerFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Trailer_fields, sep); err != nil { return err } } @@ -195,17 +195,17 @@ func (er *EventReaderCfg) Clone() (cln *EventReaderCfg) { } } cln.Flags = er.Flags - cln.Header_fields = make([]*FCTemplate, len(er.Header_fields)) - for idx, fld := range er.Header_fields { - cln.Header_fields[idx] = fld.Clone() + cln.HeaderFields = make([]*FCTemplate, len(er.HeaderFields)) + for idx, fld := range er.HeaderFields { + cln.HeaderFields[idx] = fld.Clone() } - cln.Content_fields = make([]*FCTemplate, len(er.Content_fields)) - for idx, fld := range er.Content_fields { - cln.Content_fields[idx] = fld.Clone() + cln.ContentFields = make([]*FCTemplate, len(er.ContentFields)) + for idx, fld := range er.ContentFields { + cln.ContentFields[idx] = fld.Clone() } - cln.Trailer_fields = make([]*FCTemplate, len(er.Trailer_fields)) - for idx, fld := range er.Trailer_fields { - cln.Trailer_fields[idx] = fld.Clone() + cln.TrailerFields = make([]*FCTemplate, len(er.TrailerFields)) + for idx, fld := range er.TrailerFields { + cln.TrailerFields[idx] = fld.Clone() } cln.Continue = er.Continue return diff --git a/config/erscfg_test.go b/config/erscfg_test.go index 24774c442..9dfe8faa1 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -27,14 +27,14 @@ import ( func TestEventRedearClone(t *testing.T) { orig := &EventReaderCfg{ - ID: utils.MetaDefault, - Type: "RandomType", - FieldSep: ",", - SourceID: "RandomSource", - Filters: []string{"Filter1", "Filter2"}, - Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), - Header_fields: []*FCTemplate{}, - Content_fields: []*FCTemplate{ + ID: utils.MetaDefault, + Type: "RandomType", + FieldSep: ",", + SourceID: "RandomSource", + Filters: []string{"Filter1", "Filter2"}, + Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), + HeaderFields: []*FCTemplate{}, + ContentFields: []*FCTemplate{ { Tag: "TOR", FieldId: "ToR", @@ -50,22 +50,22 @@ func TestEventRedearClone(t *testing.T) { Mandatory: true, }, }, - Trailer_fields: []*FCTemplate{}, - Continue: true, + TrailerFields: []*FCTemplate{}, + Continue: true, } cloned := orig.Clone() if !reflect.DeepEqual(cloned, orig) { t.Errorf("expected: %s \n,received: %s", utils.ToJSON(orig), utils.ToJSON(cloned)) } initialOrig := &EventReaderCfg{ - ID: utils.MetaDefault, - Type: "RandomType", - FieldSep: ",", - SourceID: "RandomSource", - Filters: []string{"Filter1", "Filter2"}, - Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), - Header_fields: []*FCTemplate{}, - Content_fields: []*FCTemplate{ + ID: utils.MetaDefault, + Type: "RandomType", + FieldSep: ",", + SourceID: "RandomSource", + Filters: []string{"Filter1", "Filter2"}, + Tenant: NewRSRParsersMustCompile("cgrates.org", true, utils.INFIELD_SEP), + HeaderFields: []*FCTemplate{}, + ContentFields: []*FCTemplate{ { Tag: "TOR", FieldId: "ToR", @@ -81,11 +81,11 @@ func TestEventRedearClone(t *testing.T) { Mandatory: true, }, }, - Trailer_fields: []*FCTemplate{}, - Continue: true, + TrailerFields: []*FCTemplate{}, + Continue: true, } orig.Filters = []string{"SingleFilter"} - orig.Content_fields = []*FCTemplate{ + orig.ContentFields = []*FCTemplate{ { Tag: "TOR", FieldId: "ToR", @@ -122,8 +122,8 @@ func TestEventReaderLoadFromJSON(t *testing.T) { Timezone: utils.EmptyString, Filters: nil, Flags: utils.FlagsWithParams{}, - Header_fields: make([]*FCTemplate, 0), - Content_fields: []*FCTemplate{ + HeaderFields: make([]*FCTemplate, 0), + ContentFields: []*FCTemplate{ {Tag: "TOR", FieldId: "ToR", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~2", true, utils.INFIELD_SEP), Mandatory: true}, {Tag: "OriginID", FieldId: "OriginID", Type: utils.META_COMPOSED, @@ -147,7 +147,7 @@ func TestEventReaderLoadFromJSON(t *testing.T) { {Tag: "Usage", FieldId: "Usage", Type: utils.META_COMPOSED, Value: NewRSRParsersMustCompile("~13", true, utils.INFIELD_SEP), Mandatory: true}, }, - Trailer_fields: make([]*FCTemplate, 0), + TrailerFields: make([]*FCTemplate, 0), }, }, } diff --git a/ers/ers.go b/ers/ers.go index 6bdf71ae9..36d94bcb9 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -66,8 +66,8 @@ type ERService struct { // ListenAndServe keeps the service alive func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { - for _, rdrCfg := range erS.cfg.ERsCfg().Readers { - if err = erS.addReader(rdrCfg); err != nil { + for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers { + if err = erS.addReader(rdrCfg.ID, cfgIdx); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> adding reader <%s> got error: <%s>", utils.ERs, rdrCfg.ID, err.Error())) @@ -92,14 +92,16 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) { } // addReader will add a new reader to the service -func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) { - erS.stopLsn[rdrCfg.ID] = make(chan struct{}) +func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { + erS.stopLsn[rdrID] = make(chan struct{}) var rdr EventReader - if rdr, err = NewEventReader(rdrCfg, erS.stopLsn[rdrCfg.ID], erS.exitChan); err != nil { + if rdr, err = NewEventReader(erS.cfg, cfgIdx, + erS.rdrEvents, erS.filterS, + erS.stopLsn[rdrID], erS.exitChan); err != nil { return } - erS.rdrs[rdrCfg.ID] = rdr - return rdr.Init() + erS.rdrs[rdrID] = rdr + return rdr.Serve() } // handleReloads will handle the config reloads which are signaled over cfgRldChan @@ -109,17 +111,19 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { case <-erS.exitChan: return case <-cfgRldChan: - cfgIDs := make(map[string]*config.EventReaderCfg) + cfgIDs := make(map[string]int) pathReloaded := make(map[string]struct{}) // index config IDs - for _, rdrCfg := range erS.cfg.ERsCfg().Readers { - cfgIDs[rdrCfg.ID] = rdrCfg + for i, rdrCfg := range erS.cfg.ERsCfg().Readers { + cfgIDs[rdrCfg.ID] = i } erS.Lock() // remove the necessary ids - for id := range erS.rdrs { - if newCfg, has := cfgIDs[id]; has { // still present - if newCfg.SourcePath == erS.rdrPaths[id] { + for id, rdr := range erS.rdrs { + if cfgIdx, has := cfgIDs[id]; has { // still present + newCfg := erS.cfg.ERsCfg().Readers[cfgIdx] + if newCfg.SourcePath == erS.rdrPaths[id] && + newCfg.ID == rdr.Config().ID { // make sure the index did not change continue } pathReloaded[id] = struct{}{} @@ -129,16 +133,16 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) { delete(erS.stopLsn, id) } // add new ids - for id, rdrCfg := range cfgIDs { + for id, rdrIdx := range cfgIDs { if _, has := erS.rdrs[id]; has { if _, has := pathReloaded[id]; !has { continue } } - if err := erS.addReader(rdrCfg); err != nil { + if err := erS.addReader(id, rdrIdx); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> adding reader <%s> got error: <%s>", - utils.ERs, rdrCfg.ID, err.Error())) + utils.ERs, id, err.Error())) erS.exitChan <- true } } diff --git a/ers/filecsv.go b/ers/filecsv.go index c4a9f0a35..b00f45eae 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -19,42 +19,62 @@ along with this program. If not, see package ers import ( + "bufio" + "encoding/csv" "fmt" + "io" "io/ioutil" + "net" + "os" + "path" + "strconv" "strings" "sync" "time" + "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewCSVFileER(cfg *config.EventReaderCfg, +func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, fltrS *engine.FilterS, rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { - srcPath := cfg.SourcePath + srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } - return &CSVFileER{erCfg: cfg, rdrDir: srcPath, - rdrExit: rdrExit, appExit: appExit}, nil + return &CSVFileER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + rdrExit: rdrExit, + appExit: appExit}, nil } // CSVFileER implements EventReader interface for .csv files type CSVFileER struct { sync.RWMutex - erCfg *config.EventReaderCfg - rdrDir string - rdrExit chan struct{} - appExit chan bool + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrExit chan struct{} + appExit chan bool + conReqs chan struct{} // limit number of opened files } func (rdr *CSVFileER) Config() *config.EventReaderCfg { - return rdr.erCfg + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -func (rdr *CSVFileER) Init() (err error) { - switch rdr.erCfg.RunDelay { - case time.Duration(0): // 0 disables the automatic read, maybe per API +func (rdr *CSVFileER) Serve() (err error) { + switch rdr.Config().RunDelay { + case time.Duration(0): // 0 disables the automatic read, maybe done per API return case time.Duration(-1): return watchDir(rdr.rdrDir, rdr.processFile, @@ -80,16 +100,126 @@ func (rdr *CSVFileER) Init() (err error) { } }() } - time.Sleep(rdr.erCfg.RunDelay) + time.Sleep(rdr.Config().RunDelay) } } } -func (rdr *CSVFileER) Read() (ev *utils.CGREvent, err error) { +// processFile is called for each file in a directory and dispatches erEvents from it +func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { + if cap(rdr.conReqs) != 0 { // 0 goes for no limit + processFile := <-rdr.conReqs // Queue here for maxOpenFiles + defer func() { rdr.conReqs <- processFile }() + } + absPath := path.Join(fPath, fName) + utils.Logger.Info( + fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) + var file *os.File + if file, err = os.Open(absPath); err != nil { + return + } + defer file.Close() + csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.Comma = rune(rdr.Config().FieldSep[0]) + csvReader.Comment = '#' + rowNr := 0 // This counts the rows in the file, not really number of CDRs + evsPosted := 0 + timeStart := time.Now() + reqVars := make(map[string]interface{}) + for { + rowNr++ + var record []string + if record, err = csvReader.Read(); err != nil { + if err == io.EOF { + break + } + return + } + agReq := agents.NewAgentRequest( + &csvProvider{req: record}, reqVars, + nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS) // create an AgentRequest + if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + continue + } + navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + continue + } + rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent( + agReq.Tenant, utils.NestingSep), + rdrCfg: rdr.Config()} + evsPosted++ + } + if rdr.Config().ProcessedPath != "" { + // Finished with file, move it to processed folder + outPath := path.Join(rdr.Config().ProcessedPath, fName) + if err = os.Rename(absPath, outPath); err != nil { + return + } + } + + utils.Logger.Info( + fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", + utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) return } -// processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *CSVFileER) processFile(itmPath, itmID string) (err error) { +// csvProvider implements engine.DataProvider so we can pass it to filters +type csvProvider struct { + req []string + cache *config.NavigableMap +} + +// String is part of engine.DataProvider interface +// when called, it will display the already parsed values out of cache +func (cP *csvProvider) String() string { + return utils.ToJSON(cP) +} + +// FieldAsInterface is part of engine.DataProvider interface +func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) { + if len(fldPath) != 1 { + return nil, utils.ErrNotFound + } + if data, err = cP.cache.FieldAsInterface(fldPath); err == nil || + err != utils.ErrNotFound { // item found in cache + return + } + err = nil // cancel previous err + if cfgFieldIdx, err := strconv.Atoi(fldPath[0]); err != nil || len(cP.req) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err) + } else { + data = cP.req[cfgFieldIdx] + } + cP.cache.Set(fldPath, data, false, false) return } + +// FieldAsString is part of engine.DataProvider interface +func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) { + var valIface interface{} + valIface, err = cP.FieldAsInterface(fldPath) + if err != nil { + return + } + return utils.IfaceAsString(valIface), nil +} + +// AsNavigableMap is part of engine.DataProvider interface +func (cP *csvProvider) AsNavigableMap([]*config.FCTemplate) ( + nm *config.NavigableMap, err error) { + return nil, utils.ErrNotImplemented +} + +// RemoteHost is part of engine.DataProvider interface +func (cP *csvProvider) RemoteHost() net.Addr { + return utils.LocalAddr() +} diff --git a/ers/reader.go b/ers/reader.go index 6e7867e3d..d74b1e1b1 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -22,23 +22,23 @@ import ( "fmt" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) type EventReader interface { Config() *config.EventReaderCfg // return it's configuration - Init() error // subscribe the reader on the path - Read() (*utils.CGREvent, error) // produce a single record in the events file + Serve() error // subscribe the reader on the path } // NewEventReader instantiates the event reader based on configuration at index -func NewEventReader(rdrCfg *config.EventReaderCfg, - rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { - switch rdrCfg.Type { +func NewEventReader(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, + fltrS *engine.FilterS, rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) { + switch cfg.ERsCfg().Readers[cfgIdx].Type { default: - err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type) + err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) case utils.MetaFileCSV: - return NewCSVFileER(rdrCfg, rdrExit, appExit) + return NewCSVFileER(cfg, cfgIdx, rdrEvents, fltrS, rdrExit, appExit) } return }