diff --git a/agents/agentreq.go b/agents/agentreq.go index 0b47eb23b..9b197b342 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -40,7 +40,7 @@ func NewAgentRequest(req utils.DataProvider, tntTpl config.RSRParsers, dfltTenant, timezone string, filterS *engine.FilterS, - header, trailer utils.DataProvider) (ar *AgentRequest) { + extraDP map[string]utils.DataProvider) (ar *AgentRequest) { if cgrRply == nil { cgrRply = &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)} } @@ -53,6 +53,9 @@ func NewAgentRequest(req utils.DataProvider, if opts == nil { opts = make(utils.MapStorage) } + if extraDP == nil { + extraDP = make(map[string]utils.DataProvider) + } ar = &AgentRequest{ Request: req, Tenant: dfltTenant, @@ -63,10 +66,9 @@ func NewAgentRequest(req utils.DataProvider, Reply: rply, Timezone: timezone, filterS: filterS, - Header: header, - Trailer: trailer, Opts: opts, Cfg: config.CgrConfig().GetDataProvider(), + ExtraDP: extraDP, } if tnt, err := tntTpl.ParseDataProvider(ar); err == nil && tnt != utils.EmptyString { ar.Tenant = tnt @@ -86,12 +88,11 @@ type AgentRequest struct { Tenant string Timezone string filterS *engine.FilterS - Header utils.DataProvider - Trailer utils.DataProvider diamreq *utils.OrderedNavigableMap // used in case of building requests (ie. DisconnectSession) tmp *utils.DataNode // used in case you want to store temporary items and access them later Opts utils.MapStorage Cfg utils.DataProvider + ExtraDP map[string]utils.DataProvider } // String implements utils.DataProvider @@ -108,7 +109,11 @@ func (ar *AgentRequest) RemoteHost() net.Addr { func (ar *AgentRequest) FieldAsInterface(fldPath []string) (val interface{}, err error) { switch fldPath[0] { default: - return nil, fmt.Errorf("unsupported field prefix: <%s>", fldPath[0]) + dp, has := ar.ExtraDP[fldPath[0]] + if !has { + return nil, fmt.Errorf("unsupported field prefix: <%s>", fldPath[0]) + } + val, err = dp.FieldAsInterface(fldPath[1:]) case utils.MetaReq: val, err = ar.Request.FieldAsInterface(fldPath[1:]) case utils.MetaVars: @@ -121,10 +126,6 @@ func (ar *AgentRequest) FieldAsInterface(fldPath []string) (val interface{}, err val, err = ar.diamreq.FieldAsInterface(fldPath[1:]) case utils.MetaRep: val, err = ar.Reply.FieldAsInterface(fldPath[1:]) - case utils.MetaHdr: - val, err = ar.Header.FieldAsInterface(fldPath[1:]) - case utils.MetaTrl: - val, err = ar.Trailer.FieldAsInterface(fldPath[1:]) case utils.MetaTmp: val, err = ar.tmp.FieldAsInterface(fldPath[1:]) case utils.MetaUCH: diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index b1ef52060..19b142a57 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -42,7 +42,7 @@ func TestAgReqSetFields(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.CGRID, PathSlice: []string{utils.CGRID}}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String())) @@ -145,7 +145,7 @@ func TestAgentRequestSetFields(t *testing.T) { ar := NewAgentRequest(utils.MapStorage(req), nil, nil, nil, nil, config.NewRSRParsersMustCompile("", utils.NestingSep), "cgrates.org", "", engine.NewFilterS(cfg, nil, dm), - utils.MapStorage(req), utils.MapStorage(req)) + map[string]utils.DataProvider{utils.MetaHdr: utils.MapStorage(req), utils.MetaTrl: utils.MapStorage(req)}) input := []*config.FCTemplate{} if err := ar.SetFields(input); err != nil { t.Error(err) @@ -491,7 +491,7 @@ func TestAgReqMaxCost(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.CapMaxUsage, PathSlice: []string{utils.CapMaxUsage}}, utils.NewLeafNode("120s")) @@ -538,7 +538,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "MandatoryFalse", @@ -591,7 +591,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "MandatoryFalse", Path: "MandatoryFalse", Type: utils.MetaComposed, @@ -634,7 +634,7 @@ Host: api.cgrates.org dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "MandatoryFalse", Path: "MandatoryFalse", Type: utils.MetaComposed, @@ -706,7 +706,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "MandatoryFalse", Path: "MandatoryFalse", Type: utils.MetaComposed, @@ -734,7 +734,7 @@ func TestAgReqEmptyFilter(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.CGRID, PathSlice: []string{utils.CGRID}}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String())) @@ -776,7 +776,7 @@ func TestAgReqMetaExponent(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: "Value", PathSlice: []string{"Value"}}, utils.NewLeafNode("2")) agReq.CGRRequest.Set(&utils.FullPath{Path: "Exponent", PathSlice: []string{"Exponent"}}, utils.NewLeafNode("2")) @@ -802,7 +802,7 @@ func TestAgReqFieldAsNone(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -840,7 +840,7 @@ func TestAgReqFieldAsNone2(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -880,7 +880,7 @@ func TestAgReqSetField2(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -936,7 +936,7 @@ func TestAgReqFieldAsInterface(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest = utils.NewOrderedNavigableMap() agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Usage, PathSlice: []string{utils.Usage}}, &utils.DataNode{Type: utils.NMSliceType, Slice: []*utils.DataNode{{Type: utils.NMDataType, Value: &utils.DataLeaf{Data: 3 * time.Minute}}}}) @@ -996,7 +996,7 @@ func TestAgReqNewARWithCGRRplyAndRply(t *testing.T) { utils.Error: utils.NewLeafNode(""), }} - agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, cgrRply, rply, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Fld1", @@ -1033,7 +1033,7 @@ func TestAgReqSetCGRReplyWithError(t *testing.T) { rply.Set(&utils.FullPath{ Path: "FirstLevel.SecondLevel.Fld1", PathSlice: []string{"FirstLevel", "SecondLevel", "Fld1"}}, utils.NewLeafNode("Val1")) - agReq := NewAgentRequest(nil, nil, nil, rply, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, rply, nil, nil, "cgrates.org", "", filterS, nil) agReq.setCGRReply(nil, utils.ErrNotFound) @@ -1081,7 +1081,7 @@ func TestAgReqSetCGRReplyWithoutError(t *testing.T) { } agReq := NewAgentRequest(nil, nil, nil, rply, nil, - nil, "cgrates.org", "", filterS, nil, nil) + nil, "cgrates.org", "", filterS, nil) agReq.setCGRReply(myEv, nil) @@ -1128,7 +1128,7 @@ func TestAgReqParseFieldMetaCCUsage(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "CCUsage", Filters: []string{}, @@ -1212,7 +1212,7 @@ func TestAgReqParseFieldMetaUsageDifference(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Usage", Filters: []string{}, @@ -1278,7 +1278,7 @@ func TestAgReqParseFieldMetaSum(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Sum", Filters: []string{}, @@ -1322,7 +1322,7 @@ func TestAgReqParseFieldMetaDifference(t *testing.T) { config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Diff", Filters: []string{}, @@ -1366,7 +1366,7 @@ func TestAgReqParseFieldMetaMultiply(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Multiply", Filters: []string{}, @@ -1410,7 +1410,7 @@ func TestAgReqParseFieldMetaDivide(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Divide", Filters: []string{}, @@ -1454,7 +1454,7 @@ func TestAgReqParseFieldMetaValueExponent(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "ValExp", Filters: []string{}, @@ -1507,7 +1507,7 @@ func TestAgReqOverwrite(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -1559,7 +1559,7 @@ func TestAgReqGroupType(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -1603,7 +1603,7 @@ func TestAgReqSetFieldsInTmp(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) tplFlds := []*config.FCTemplate{ @@ -1635,7 +1635,7 @@ func TestAgReqSetFieldsIp2Hex(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: "IP", PathSlice: []string{"IP"}}, utils.NewLeafNode("62.87.114.244")) tplFlds := []*config.FCTemplate{ @@ -1662,7 +1662,7 @@ func TestAgReqSetFieldsString2Hex(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: "CustomField", PathSlice: []string{"CustomField"}}, utils.NewLeafNode(string([]byte{0x94, 0x71, 0x02, 0x31, 0x01, 0x59}))) tplFlds := []*config.FCTemplate{ @@ -1689,7 +1689,7 @@ func TestAgReqSetFieldsWithRemove(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.CGRID, PathSlice: []string{utils.CGRID}}, utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String())) @@ -1818,7 +1818,7 @@ func TestAgReqSetFieldsInCache(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) engine.NewCacheS(cfg, dm, nil) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) tplFlds := []*config.FCTemplate{ @@ -1861,7 +1861,7 @@ func TestAgReqSetFieldsInCacheWithTimeOut(t *testing.T) { cfg.CacheCfg().Partitions[utils.CacheUCH].TTL = time.Second engine.Cache = engine.NewCacheS(cfg, dm, nil) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) tplFlds := []*config.FCTemplate{ @@ -1954,7 +1954,7 @@ func TestAgReqFiltersInsideField(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) //pass the data provider to agent request - agReq := NewAgentRequest(newDADataProvider(nil, m), nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(newDADataProvider(nil, m), nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "Usage", @@ -1984,7 +1984,7 @@ func TestAgReqDynamicPath(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -2052,7 +2052,7 @@ func TestAgReqRoundingDecimals(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -2128,7 +2128,7 @@ func BenchmarkAgReqSetField(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathSlice: []string{utils.ToR}}, utils.NewLeafNode(utils.MetaVoice)) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AccountField, PathSlice: []string{utils.AccountField}}, utils.NewLeafNode("1001")) @@ -2169,7 +2169,7 @@ func TestAgReqSetFieldsFromCfg(t *testing.T) { dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) tplFlds := []*config.FCTemplate{ {Tag: "CfgField", @@ -2223,7 +2223,7 @@ func TestFieldAsInterface(t *testing.T) { func TestAgentRequestParseFieldDateTimeDaily(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*daily", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{ Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep), @@ -2252,7 +2252,7 @@ func TestAgentRequestParseFieldDateTimeDaily(t *testing.T) { func TestAgentRequestParseFieldDateTimeTimeZone(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*daily", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*daily", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2280,7 +2280,7 @@ func TestAgentRequestParseFieldDateTimeTimeZone(t *testing.T) { func TestAgentRequestParseFieldDateTimeMonthly(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*monthly", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*monthly", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2308,7 +2308,7 @@ func TestAgentRequestParseFieldDateTimeMonthly(t *testing.T) { func TestAgentRequestParseFieldDateTimeMonthlyEstimated(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*monthly_estimated", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*monthly_estimated", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2336,7 +2336,7 @@ func TestAgentRequestParseFieldDateTimeMonthlyEstimated(t *testing.T) { func TestAgentRequestParseFieldDateTimeYearly(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*yearly", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*yearly", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2364,7 +2364,7 @@ func TestAgentRequestParseFieldDateTimeYearly(t *testing.T) { func TestAgentRequestParseFieldDateTimeMetaUnlimited(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile(utils.MetaUnlimited, utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile(utils.MetaUnlimited, utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2392,7 +2392,7 @@ func TestAgentRequestParseFieldDateTimeMetaUnlimited(t *testing.T) { func TestAgentRequestParseFieldDateTimeEmpty(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2420,7 +2420,7 @@ func TestAgentRequestParseFieldDateTimeEmpty(t *testing.T) { func TestAgentRequestParseFieldDateTimeMonthEnd(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2448,7 +2448,7 @@ func TestAgentRequestParseFieldDateTimeMonthEnd(t *testing.T) { func TestAgentRequestParseFieldDateTimeError(t *testing.T) { tntTpl := config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep) - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, tntTpl, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: config.NewRSRParsersMustCompile("*month_endTest", utils.InfieldSep), Layout: "“Mon Jan _2 15:04:05 2006”", @@ -2467,7 +2467,7 @@ func TestAgentRequestParseFieldDateTimeError2(t *testing.T) { if err != nil { t.Fatal(err) } - AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, prsr, "", "", nil, nil, nil) + AgentReq := NewAgentRequest(utils.MapStorage{}, nil, nil, nil, nil, prsr, "", "", nil, nil) fctTemp := &config.FCTemplate{Type: utils.MetaDateTime, Value: prsr, Layout: "“Mon Jan _2 15:04:05 2006”", diff --git a/agents/diamagent.go b/agents/diamagent.go index 6ea4fbedf..5dbd4b7fb 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -278,7 +278,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { reqProcessor.Tenant, da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, da.cgrCfg.GeneralCfg().DefaultTimezone), - da.filterS, nil, nil)) + da.filterS, nil)) if lclProcessed { processed = lclProcessed } @@ -531,7 +531,7 @@ func (da *DiameterAgent) sendASR(originID string, reply *string) (err error) { newDADataProvider(dmd.c, dmd.m), dmd.vars, nil, nil, nil, nil, da.cgrCfg.GeneralCfg().DefaultTenant, - da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil, nil) + da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil) if err = aReq.SetFields(da.cgrCfg.TemplatesCfg()[da.cgrCfg.DiameterAgentCfg().ASRTemplate]); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> cannot disconnect session with OriginID: <%s>, err: %s", @@ -574,7 +574,7 @@ func (da *DiameterAgent) V1ReAuthorize(ctx *context.Context, originID string, re newDADataProvider(dmd.c, dmd.m), dmd.vars, nil, nil, nil, nil, da.cgrCfg.GeneralCfg().DefaultTenant, - da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil, nil) + da.cgrCfg.GeneralCfg().DefaultTimezone, da.filterS, nil) if err = aReq.SetFields(da.cgrCfg.TemplatesCfg()[da.cgrCfg.DiameterAgentCfg().RARTemplate]); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> cannot send RAR with OriginID: <%s>, err: %s", diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go index c65273259..9040ec222 100644 --- a/agents/diamagent_test.go +++ b/agents/diamagent_test.go @@ -441,7 +441,7 @@ func TestProcessRequest(t *testing.T) { reqProcessor.Flags = utils.FlagsWithParamsFromSlice([]string{utils.MetaAuthorize, utils.MetaAccounts}) agReq := NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, nil, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, - config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil, nil) + config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) internalSessionSChan := make(chan birpc.ClientConnector, 1) internalSessionSChan <- sS @@ -471,7 +471,7 @@ func TestProcessRequest(t *testing.T) { agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, nil, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, - config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil, nil) + config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) pr, err = da.processRequest(reqProcessor, agReq) if err != nil { @@ -488,7 +488,7 @@ func TestProcessRequest(t *testing.T) { agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, nil, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, - config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil, nil) + config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) pr, err = da.processRequest(reqProcessor, agReq) if err != nil { @@ -511,7 +511,7 @@ func TestProcessRequest(t *testing.T) { agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, nil, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, - config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil, nil) + config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) pr, err = da.processRequest(reqProcessor, agReq) if err != nil { @@ -528,7 +528,7 @@ func TestProcessRequest(t *testing.T) { agReq = NewAgentRequest(diamDP, reqVars, cgrRplyNM, rply, nil, reqProcessor.Tenant, config.CgrConfig().GeneralCfg().DefaultTenant, - config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil, nil) + config.CgrConfig().GeneralCfg().DefaultTimezone, filters, nil) pr, err = da.processRequest(reqProcessor, agReq) if err != nil { diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 32fab2164..ed42b010d 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -130,7 +130,7 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { da.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(da.cgrCfg.DNSAgentCfg().Timezone, da.cgrCfg.GeneralCfg().DefaultTimezone), - da.fltrS, nil, nil)) + da.fltrS, nil)) if lclProcessed { processed = lclProcessed } diff --git a/agents/httpagent.go b/agents/httpagent.go index 16686c319..6ce04a06b 100644 --- a/agents/httpagent.go +++ b/agents/httpagent.go @@ -73,7 +73,7 @@ func (ha *HTTPAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { opts, reqProcessor.Tenant, ha.dfltTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ha.filterS, nil, nil) + ha.filterS, nil) lclProcessed, err := ha.processRequest(reqProcessor, agReq) if err != nil { utils.Logger.Warning( diff --git a/agents/libdiam.go b/agents/libdiam.go index ad21930e2..41b074d28 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -466,7 +466,7 @@ func diamErr(m *diam.Message, resCode uint32, aReq := NewAgentRequest( newDADataProvider(nil, m), reqVars, nil, nil, nil, nil, - tnt, tmz, filterS, nil, nil) + tnt, tmz, filterS, nil) if err = aReq.SetFields(tpl); err != nil { return } diff --git a/agents/libdiam_test.go b/agents/libdiam_test.go index f41e28e6d..d4c817b02 100644 --- a/agents/libdiam_test.go +++ b/agents/libdiam_test.go @@ -1150,7 +1150,7 @@ func TestFilterWithDiameterDP(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + agReq := NewAgentRequest(dP, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil) if pass, err := filterS.Pass(context.TODO(), "cgrates.org", []string{"*exists:~*req.Multiple-Services-Credit-Control.Rating-Group[~Rating-Group(99)]:"}, agReq); err != nil { diff --git a/agents/librad_test.go b/agents/librad_test.go index 0c46ebd53..c56d8c2ab 100644 --- a/agents/librad_test.go +++ b/agents/librad_test.go @@ -89,7 +89,7 @@ func TestRadReplyAppendAttributes(t *testing.T) { for _, v := range rplyFlds { v.ComputePath() } - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil) agReq.CGRReply.Set([]string{utils.CapMaxUsage}, utils.NewLeafNode(time.Hour)) agReq.CGRReply.Set([]string{utils.CapAttributes, "RadReply"}, utils.NewLeafNode("AccessAccept")) agReq.CGRReply.Set([]string{utils.CapAttributes, utils.AccountField}, utils.NewLeafNode("1001")) diff --git a/agents/libsip.go b/agents/libsip.go index c8fab905a..e8eee6416 100644 --- a/agents/libsip.go +++ b/agents/libsip.go @@ -50,7 +50,7 @@ func sipErr(m utils.DataProvider, sipMessage sipingo.Message, aReq := NewAgentRequest( m, reqVars, nil, nil, nil, nil, - tnt, tmz, filterS, nil, nil) + tnt, tmz, filterS, nil) if err = aReq.SetFields(tpl); err != nil { return } diff --git a/agents/libsip_test.go b/agents/libsip_test.go index 32178e7a9..98af5def9 100644 --- a/agents/libsip_test.go +++ b/agents/libsip_test.go @@ -41,7 +41,7 @@ func TestUpdateSIPMsgFromNavMap(t *testing.T) { for _, v := range rplyFlds { v.ComputePath() } - agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil) agReq.CGRReply.Set([]string{utils.CapMaxUsage}, utils.NewLeafNode(time.Hour)) agReq.CGRReply.Set([]string{utils.CapAttributes, "Request"}, utils.NewLeafNode("SIP/2.0 302 Moved Temporarily")) agReq.CGRReply.Set([]string{utils.CapAttributes, utils.AccountField}, utils.NewLeafNode("1001")) diff --git a/agents/radagent.go b/agents/radagent.go index d3145fea0..a2c808453 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -90,7 +90,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ra.filterS, nil, nil) + ra.filterS, nil) agReq.Vars.Map[MetaRadReqType] = utils.NewLeafNode(MetaRadAuth) var lclProcessed bool if lclProcessed, err = ra.processRequest(req, reqProcessor, agReq, rpl); lclProcessed { @@ -135,7 +135,7 @@ func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err e reqProcessor.Tenant, ra.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - ra.filterS, nil, nil) + ra.filterS, nil) var lclProcessed bool if lclProcessed, err = ra.processRequest(req, reqProcessor, agReq, rpl); lclProcessed { processed = lclProcessed diff --git a/agents/sipagent.go b/agents/sipagent.go index 286573d08..95a16b907 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -333,7 +333,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(reqProcessor.Timezone, config.CgrConfig().GeneralCfg().DefaultTimezone), - sa.filterS, nil, nil) + sa.filterS, nil) var lclProcessed bool if lclProcessed, err = sa.processRequest(reqProcessor, agReq); err != nil { utils.Logger.Warning( diff --git a/config/config_defaults.go b/config/config_defaults.go index 3a86b97ce..76284dda6 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -333,6 +333,9 @@ const CGRATES_CFG_JSON = ` // "fstFailedCallsPrefix": "" // Used in case of flatstore CDRs to avoid searching for BYE records // "fstRecordCacheTTL": "1s" // Duration to cache partial records when not pairing // "fstLazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field + "fstMethod": "~*req.1", // the rsr parser that will determine the method of the current record + "fstOriginID": "~*req.3;~*req.1;~*req.2", // the rsr parser that will determine the originID of the current record + "fstMadatoryACK": false, // if we should receive the ACK before processing the record // FileXML "xmlRootPath": "", // path towards one event in case of XML CDRs diff --git a/config/slicedp.go b/config/slicedp.go index 10662df83..29249a23a 100644 --- a/config/slicedp.go +++ b/config/slicedp.go @@ -30,7 +30,7 @@ import ( func NewSliceDP(record []string, indxAls map[string]int) (dP utils.DataProvider) { return &SliceDP{ req: record, - cache: utils.MapStorage{}, + cache: utils.MapStorage{utils.Length: len(record)}, idxAls: indxAls, } } diff --git a/config/slicedp_test.go b/config/slicedp_test.go index 54bdf791a..6b81797ed 100644 --- a/config/slicedp_test.go +++ b/config/slicedp_test.go @@ -43,7 +43,7 @@ func TestNewSliceDp(t *testing.T) { } expected := &SliceDP{ req: record, - cache: utils.MapStorage{}, + cache: utils.MapStorage{utils.Length: len(record)}, idxAls: index, } if newSliceDP := NewSliceDP(record, index); !reflect.DeepEqual(expected, newSliceDP) { diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index 7c2df0d80..af99a985e 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -355,6 +355,7 @@ {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true}, {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} diff --git a/ers/amqp.go b/ers/amqp.go index 83761f3a5..7ae389456 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -192,7 +192,7 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/amqpv1.go b/ers/amqpv1.go index aca161b96..0e02cf5ac 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -166,7 +166,7 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/filecsv.go b/ers/filecsv.go index e6b2bb35c..f1a106072 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -169,7 +169,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( diff --git a/ers/filefwv.go b/ers/filefwv.go index 7c8d89135..e31560c81 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -199,7 +199,7 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, rdr.headerDP, rdr.trailerDP) // create an AgentRequest + rdr.fltrS, map[string]utils.DataProvider{utils.MetaHdr: rdr.headerDP, utils.MetaTrl: rdr.trailerDP}) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( @@ -294,7 +294,7 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, rdr.trailerDP) // create an AgentRequest + rdr.fltrS, map[string]utils.DataProvider{utils.MetaTrl: rdr.trailerDP}) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil @@ -334,7 +334,7 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, rdr.headerDP, nil) // create an AgentRequest + rdr.fltrS, map[string]utils.DataProvider{utils.MetaHdr: rdr.headerDP}) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { return nil diff --git a/ers/filejson.go b/ers/filejson.go index 1a9bc4cc5..ab52a0ec7 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -150,7 +150,7 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( diff --git a/ers/filexml.go b/ers/filexml.go index 997e30b06..7bcf82b77 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -149,7 +149,7 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( diff --git a/ers/flatstore.go b/ers/flatstore.go index 66e38e0d1..86434af7b 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -20,12 +20,10 @@ package ers import ( "encoding/csv" - "errors" "fmt" "io" "os" "path" - "strconv" "strings" "sync" "time" @@ -40,9 +38,8 @@ import ( ) type fstRecord struct { - inv []string - bye []string - ack []string + method string + values []string fileName string } @@ -161,7 +158,20 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { evsPosted := 0 timeStart := time.Now() reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.FileName: utils.NewLeafNode(fName)}} - failCallPrfx := utils.IfaceAsString(rdr.Config().Opts[utils.FstFailedCallsPrefixOpt]) + faildCallPrfx := utils.IfaceAsString(rdr.Config().Opts[utils.FstFailedCallsPrefixOpt]) + failedCallsFile := len(faildCallPrfx) != 0 && strings.HasPrefix(fName, faildCallPrfx) + var methodTmp config.RSRParsers + if methodTmp, err = config.NewRSRParsers(utils.IfaceAsString(rdr.Config().Opts[utils.FstMethodOpt]), rdr.cgrCfg.GeneralCfg().RSRSep); err != nil { + return + } + var originTmp config.RSRParsers + if originTmp, err = config.NewRSRParsers(utils.IfaceAsString(rdr.Config().Opts[utils.FstOriginIDOpt]), rdr.cgrCfg.GeneralCfg().RSRSep); err != nil { + return + } + var mandatoryAcK bool + if mandatoryAcK, err = utils.IfaceAsBool(rdr.Config().Opts[utils.FstMadatoryACKOpt]); err != nil { + return + } for { var record []string if record, err = csvReader.Read(); err != nil { @@ -170,48 +180,46 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { } return } - if strings.HasPrefix(fName, failCallPrfx) { // Use the first index since they should be the same in all configs - record = append(record, "0") // Append duration 0 for failed calls flatstore CDR - } else { - pr, err := NewUnpairedRecord(record, utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone), fName) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting row : <%s> to unpairedRecord , ignoring due to error: <%s>", - utils.ERs, record, err.Error())) - continue - } - if val, has := rdr.cache.Get(pr.OriginID); !has { - rdr.cache.Set(pr.OriginID, pr, nil) - continue - } else { - pair := val.(*UnpairedRecord) - record, err = pairToRecord(pair, pr) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Merging unpairedRecords : <%s> and <%s> to record , ignoring due to error: <%s>", - utils.ERs, utils.ToJSON(pair), utils.ToJSON(pr), err.Error())) - continue - } - rdr.cache.Set(pr.OriginID, nil, nil) - rdr.cache.Remove(pr.OriginID) - } + req := config.NewSliceDP(record, nil) + tmpReq := utils.MapStorage{utils.MetaReq: req} + var method string + if method, err = methodTmp.ParseDataProvider(tmpReq); err != nil { + return + } else if method != utils.FstInvite && + method != utils.FstBye && + method != utils.FstAck { + return fmt.Errorf("Unsuported method<%s>", method) } - // build Usage from Fields based on record lenght - for i, cntFld := range rdr.Config().Fields { - if cntFld.Path == utils.MetaCgreq+utils.NestingSep+utils.Usage { - rdr.Config().Fields[i].Value = config.NewRSRParsersMustCompile("~*req."+strconv.Itoa(len(record)-1), utils.InfieldSep) // in case of flatstore, last element will be the duration computed by us - } + var originID string + if originID, err = originTmp.ParseDataProvider(tmpReq); err != nil { + return } + + records := rdr.cache.GetGroupItems(originID) + + if lrecords := len(records); !failedCallsFile && // do not set in cache if we know that the calls are failed + (lrecords == 0 || + (mandatoryAcK && lrecords != 2) || + (!mandatoryAcK && lrecords != 1)) { + rdr.cache.Set(utils.ConcatenatedKey(originID, method), &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) + continue + } + rdr.cache.RemoveGroup(originID) + extraDP := map[string]utils.DataProvider{method: req} + for _, record := range records { + req := record.(*fstRecord) + extraDP[req.method] = config.NewSliceDP(req.values, nil) + } + rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( - config.NewSliceDP(record, nil), reqVars, + req, reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( @@ -249,6 +257,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { return } +/* func NewUnpairedRecord(record []string, timezone string, fileName string) (*UnpairedRecord, error) { if len(record) < 7 { return nil, errors.New("MISSING_IE") @@ -307,14 +316,14 @@ func pairToRecord(part1, part2 *UnpairedRecord) ([]string, error) { record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) return record, nil } - +*/ func (rdr *FlatstoreER) dumpToFile(itmID string, value interface{}) { if value == nil { return } - unpRcd := value.(*UnpairedRecord) + unpRcd := value.(*fstRecord) - dumpFilePath := path.Join(rdr.Config().ProcessedPath, unpRcd.FileName+utils.TmpSuffix) + dumpFilePath := path.Join(rdr.Config().ProcessedPath, unpRcd.fileName+utils.TmpSuffix) fileOut, err := os.Create(dumpFilePath) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", @@ -323,11 +332,11 @@ func (rdr *FlatstoreER) dumpToFile(itmID string, value interface{}) { } csvWriter := csv.NewWriter(fileOut) csvWriter.Comma = rune(utils.IfaceAsString(rdr.Config().Opts[utils.FlatstorePrfx+utils.FieldSepOpt])[0]) - if err = csvWriter.Write(unpRcd.Values); err != nil { + if err = csvWriter.Write(unpRcd.values); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", - utils.ERs, unpRcd.Values, dumpFilePath, err.Error())) - return + utils.ERs, unpRcd.values, dumpFilePath, err.Error())) + // return // let it close the opened file } - csvWriter.Flush() + fileOut.Close() } diff --git a/ers/flatstore_it_test.go b/ers/flatstore_it_test.go index d67daa2dc..0af5abdd3 100644 --- a/ers/flatstore_it_test.go +++ b/ers/flatstore_it_test.go @@ -21,15 +21,21 @@ along with this program. If not, see package ers import ( + "bytes" + "fmt" + "log" "net/rpc" "os" "path" + "reflect" + "strings" "testing" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) var ( @@ -206,3 +212,484 @@ func testFlatstoreITKillEngine(t *testing.T) { t.Error(err) } } + +func TestFlatstoreProcessEvent(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l")) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "g", + utils.AnswerTime: "k", + utils.Category: "f", + utils.Destination: "i", + utils.OriginID: "b", + utils.RequestType: "c", + utils.SetupTime: "j", + utils.Subject: "h", + utils.Tenant: "e", + utils.ToR: "ToR", + utils.Usage: "0", + }, + APIOpts: map[string]interface{}{}, + } + eR.conReqs <- struct{}{} + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + select { + case data := <-eR.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +//Test the case in which the file name does not match a prefix +func TestFlatstoreProcessEvent2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + //baToR + file.Write([]byte("INVITE,a,ToR,b,c,d,2013-12-30T15:00:01Z,f,g,h,i,j,k,l")) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"} + pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname} + eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) + eR.cache.Set("baToR", pr, nil) + expEvent := &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "g", + utils.AnswerTime: "k", + utils.Category: "f", + utils.Destination: "i", + utils.OriginID: "b", + utils.RequestType: "c", + utils.SetupTime: "j", + utils.Subject: "h", + utils.Tenant: "2013-12-30T15:00:01Z", + utils.ToR: "ToR", + utils.Usage: "3600", + }, + APIOpts: map[string]interface{}{}, + } + eR.conReqs <- struct{}{} + eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + select { + case data := <-eR.rdrEvents: + expEvent.ID = data.cgrEvent.ID + expEvent.Time = data.cgrEvent.Time + if !reflect.DeepEqual(data.cgrEvent, expEvent) { + t.Errorf("Expected %v but received %v", utils.ToJSON(expEvent), utils.ToJSON(data.cgrEvent)) + } + case <-time.After(50 * time.Millisecond): + t.Error("Time limit exceeded") + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFlatstoreProcessEvent2CacheNotSet(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + file.Write([]byte("INVITE,a,ToR,b,c,d,2013-12-30T15:00:01Z,f,g,h,i,j,k,l")) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + + eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) + + eR.conReqs <- struct{}{} + eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +//Test unsupported time format err while making the unpaired record. +func TestFlatstoreProcessEvent2Error1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + //Create new logger + utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) + if err != nil { + t.Error(err) + } + utils.Logger.SetLogLevel(7) + buf := new(bytes.Buffer) + log.SetOutput(buf) + file.Write([]byte("INVITE,a,ToR,b,c,d,invalid_time,f,g,h,i,j,k,l")) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "invalid_time", "f", "g", "h", "i", "j", "k", "l"} + pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname} + eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) + eR.cache.Set("baToR", pr, nil) + + eR.conReqs <- struct{}{} + eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + errExpect := "[WARNING] Converting row : <[INVITE a ToR b c d invalid_time f g h i j k l]> to unpairedRecord , ignoring due to error: " + if rcv := buf.String(); !strings.Contains(rcv, errExpect) { + t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } + buf.Reset() +} + +//Test pairToRecord() error, where both methods of unpaired record object are INVITE +func TestFlatstoreProcessEvent2Error2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + //Create new logger + utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) + if err != nil { + t.Error(err) + } + utils.Logger.SetLogLevel(7) + buf := new(bytes.Buffer) + log.SetOutput(buf) + //baToR + file.Write([]byte("INVITE,a,ToR,b,c,d,2013-12-30T15:00:01Z,f,g,h,i,j,k,l")) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + record := []string{"INVITE", "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"} + pr := &fstRecord{method: utils.FstInvite, values: record, fileName: fname} + eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) + eR.cache.Set("baToR", pr, nil) + eR.conReqs <- struct{}{} + eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" + if err := eR.processFile(filePath, fname); err != nil { + t.Error(err) + } + errExpect := "[WARNING] Merging unpairedRecords" + if rcv := buf.String(); !strings.Contains(rcv, errExpect) { + t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } + buf.Reset() +} + +//Fields in template are empty in order to trigger SetFields() error +func TestFlatstoreProcessEventError2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage + ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + + eR.Config().Fields = []*config.FCTemplate{ + {}, + } + + errExpect := "unsupported type: <>" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +//Test invalid filters in order to trigger Pass() function error +func TestFlatstoreProcessEventError3(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ERsCfg().Readers[0].Fields = []*config.FCTemplate{} + data := engine.NewInternalDB(nil, nil, true) + dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) + cfg.ERsCfg().Readers[0].ProcessedPath = "" + fltrs := engine.NewFilterS(cfg, nil, dm) + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + if err := os.MkdirAll(filePath, 0777); err != nil { + t.Error(err) + } + file, err := os.Create(path.Join(filePath, fname)) + if err != nil { + t.Error(err) + } + file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage + ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) + file.Close() + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + + // + eR.Config().Filters = []string{"Filter1"} + errExpect := "NOT_FOUND:Filter1" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + + // + eR.Config().Filters = []string{"*exists:~*req..Account:"} + errExpect = "Invalid fieldPath [ Account]" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFlatstoreProcessEventDirError(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + filePath := "/tmp/TestFlatstoreProcessEvent/" + fname := "file1.csv" + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/ers/out/", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + errExpect := "open /tmp/TestFlatstoreProcessEvent/file1.csv: no such file or directory" + if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { + t.Errorf("Expected %v but received %v", errExpect, err) + } +} + +func TestFlatstore(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + if err := eR.Serve(); err != nil { + t.Error(err) + } +} + +func TestFlatstoreServeDefault(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + filePath := "/tmp/flatstoreErs/out" + err := os.MkdirAll(filePath, 0777) + if err != nil { + t.Error(err) + } + for i := 1; i < 4; i++ { + if _, err := os.Create(path.Join(filePath, fmt.Sprintf("file%d.csv", i))); err != nil { + t.Error(err) + } + } + eR.Config().RunDelay = 1 * time.Millisecond + os.Create(path.Join(filePath, "file1.txt")) + go func() { + time.Sleep(20 * time.Millisecond) + close(eR.rdrExit) + }() + eR.serveDefault() + if err := os.RemoveAll(filePath); err != nil { + t.Error(err) + } +} + +func TestFileFlatstoreExit(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fltrs := &engine.FilterS{} + eR := &FlatstoreER{ + cgrCfg: cfg, + cfgIdx: 0, + fltrS: fltrs, + rdrDir: "/tmp/flatstoreErs/out", + rdrEvents: make(chan *erEvent, 1), + rdrError: make(chan error, 1), + rdrExit: make(chan struct{}), + conReqs: make(chan struct{}, 1), + } + eR.conReqs <- struct{}{} + eR.Config().RunDelay = 1 * time.Millisecond + if err := eR.Serve(); err != nil { + t.Error(err) + } + eR.rdrExit <- struct{}{} +} + +func TestFlatstoreServeErrTimeDurationNeg1(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfgIdx := 0 + rdr, err := NewFlatstoreER(cfg, cfgIdx, nil, nil, nil, nil) + if err != nil { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) + } + rdr.Config().RunDelay = time.Duration(-1) + expected := "no such file or directory" + err = rdr.Serve() + if err == nil || err.Error() != expected { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) + } +} diff --git a/ers/kafka.go b/ers/kafka.go index e3e35152a..5577351af 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -163,7 +163,7 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 80220f48d..0f0fdfd6a 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -186,7 +186,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest if pass, err := rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil { utils.Logger.Warning( diff --git a/ers/s3.go b/ers/s3.go index 7fd642e92..4aa6cfbf0 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -121,7 +121,7 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/sql.go b/ers/sql.go index 874f19e78..2bfdbc4ce 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -241,7 +241,7 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/ers/sqs.go b/ers/sqs.go index 143a00697..431aa0010 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -107,7 +107,7 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil, nil) // create an AgentRequest + rdr.fltrS, nil) // create an AgentRequest var pass bool if pass, err = rdr.fltrS.Pass(context.TODO(), agReq.Tenant, rdr.Config().Filters, agReq); err != nil || !pass { diff --git a/utils/consts.go b/utils/consts.go index 06e8a497a..f86e3479e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2257,7 +2257,6 @@ const ( OptsContext = "*context" Subsys = "*subsys" OptsAttributesProcessRuns = "*processRuns" - OptsDispatcherMethod = "*method" MetaEventType = "*eventType" EventType = "EventType" SchedulerInit = "SchedulerInit" @@ -2350,10 +2349,16 @@ const ( PartialCSVRecordCacheOpt = "csvRecordCacheTTL" // flatStore - FlatstorePrfx = "fst" + FlatstorePrfx = "fst" + OptsMethod = "*method" + FstInvite = "INVITE" + FstBye = "BYE" + FstAck = "ACK" + FstFailedCallsPrefixOpt = "fstFailedCallsPrefix" FstPartialRecordCacheOpt = "fstRecordCacheTTL" - FstMethodOpt = "fstMethod" // + FstMethodOpt = "fstMethod" + FstOriginIDOpt = "fstOriginID" FstMadatoryACKOpt = "fstMadatoryACK" // fileXML