08 Jun 2026 11:13 AM
Hello community,
I'm looking for help regarding bizevent processing in openpipeline.
It looks like that PARSE (into JSON_ARRAY) dql command doesn't have the same behavior between Openpipeline and Notebook
Data input :
Cart content Raw (in string) :
"input_array_in_string": "[{\"cartid\":\"202605270404285529\",\"itemType\":\"ITEM1\",\"itemAmount\":100},{\"cartid\":\"20260527040428553\",\"itemType\":\"ITEM2\",\"itemAmount\":200},{\"cartid\":\"20260527040456789\",\"itemType\":\"ITEM1\",\"itemAmount\":300},{\"cartid\":\"202605270404987654\",\"itemType\":\"ITEM3\",\"itemAmount\":400},{\"cartid\":\"2026052704654789\",\"itemType\":\"ITEM2\",\"itemAmount\":500},{\"cartid\":\"202605271098431\",\"itemType\":\"ITEM1\",\"itemAmount\":600}]"
Cart content (Structured as an array) :
[
{
"cartid": "202605270404285529",
"itemType": "ITEM1",
"itemAmount": "100"
},
{
"cartid": "20260527040428553",
"itemType": "ITEM2",
"itemAmount": "200"
},
{
"cartid": "20260527040456789",
"itemType": "ITEM1",
"itemAmount": "300"
},
{
"cartid": "202605270404987654",
"itemType": "ITEM3",
"itemAmount": "400"
},
{
"cartid": "2026052704654789",
"itemType": "ITEM2",
"itemAmount": "500"
},
{
"cartid": "202605271098431",
"itemType": "ITEM1",
"itemAmount": "600"
}
]
My need :
Found solution (tested OK in a notebook) :
Because of openpipeline limitation, I found a way to get the expected informations (Item1 cart amount and cart count) without using EXPAND dql command
// PARSE STRING AS JSON_ARRAY
| parse input_array_in_string, """JSON_ARRAY:output_array_in_notebook"""
// COLLECT IN A NEW ARRAY EACH CART ELEMENT CONTAINING ITEM1
| fieldsAdd notebook_array_of_item1 = arrayRemoveNulls( iCollectArray(if(output_array_in_notebook[][itemType]=="ITEM1",output_array_in_notebook[])))
// CACULATE VALUES : COUNT AND AMOUNT OF ITEM1 CARTS
| fieldsAdd notebook_item1_arraySize = arraySize(notebook_array_of_item1)
| fieldsAdd notebook_item1_cumulativeAmount =arraySum(iCollectArray(notebook_array_of_item1[][itemAmount]))
Result in a notebook
{
"records": [
{
"timestamp": "2026-06-08T10:33:48.836000000+02:00",
"output_array_in_notebook": [
{
"cartid": "202605270404285529",
"itemType": "ITEM1",
"itemAmount": "100"
},
{
"cartid": "20260527040428553",
"itemType": "ITEM2",
"itemAmount": "200"
},
{
"cartid": "20260527040456789",
"itemType": "ITEM1",
"itemAmount": "300"
},
{
"cartid": "202605270404987654",
"itemType": "ITEM3",
"itemAmount": "400"
},
{
"cartid": "2026052704654789",
"itemType": "ITEM2",
"itemAmount": "500"
},
{
"cartid": "202605271098431",
"itemType": "ITEM1",
"itemAmount": "600"
}
],
"notebook_array_of_item1": [
{
"cartid": "202605270404285529",
"itemType": "ITEM1",
"itemAmount": "100"
},
{
"cartid": "20260527040456789",
"itemType": "ITEM1",
"itemAmount": "300"
},
{
"cartid": "202605271098431",
"itemType": "ITEM1",
"itemAmount": "600"
}
],
"notebook_item1_arraySize": "3",
"notebook_item1_cumulativeAmount": 1000,
}
],
"types": [
{
"indexRange": [
0,
1
],
"mappings": {
"timestamp": {
"type": "timestamp"
},
"output_array_in_notebook": {
"type": "array",
"types": [
{
"indexRange": [
0,
5
],
"mappings": {
"element": {
"type": "record",
"types": [
{
"mappings": {
"cartid": {
"type": "string"
},
"itemType": {
"type": "string"
},
"itemAmount": {
"type": "long"
}
}
}
]
}
}
}
]
},
"notebook_array_of_item1": {
"type": "array",
"types": [
{
"indexRange": [
0,
2
],
"mappings": {
"element": {
"type": "record",
"types": [
{
"mappings": {
"cartid": {
"type": "string"
},
"itemType": {
"type": "string"
},
"itemAmount": {
"type": "long"
}
}
}
]
}
}
}
]
},
"notebook_item1_arraySize": {
"type": "long"
},
"notebook_item1_cumulativeAmount": {
"type": "double"
},
"input_array_in_string": {
"type": "string"
}
}
}
]
}
Unfortunately, I don't get any result when using the same commands in a openpipeline processor
DQL Process definition
parse input_array_in_string, """JSON_ARRAY:output_array"""
| fieldsAdd openpipeline_array_of_item1 = arrayRemoveNulls( iCollectArray(if(output_array[][itemType]=="ITEM1",output_array[])))
| fieldsAdd openpipeline_item1_arraySize = arraySize(openpipeline_array_of_item1)
| fieldsAdd openpipeline_item1_cumulativeAmount =arraySum(iCollectArray(openpipeline_array_of_item1[][itemAmount]))
Openpipeline result
{
"records": [
{
"timestamp": "2026-06-08T10:33:48.836000000+02:00",
"input_array_in_string": "[{\"cartid\":\"202605270404285529\",\"itemType\":\"ITEM1\",\"itemAmount\":100},{\"cartid\":\"20260527040428553\",\"itemType\":\"ITEM2\",\"itemAmount\":200},{\"cartid\":\"20260527040456789\",\"itemType\":\"ITEM1\",\"itemAmount\":300},{\"cartid\":\"202605270404987654\",\"itemType\":\"ITEM3\",\"itemAmount\":400},{\"cartid\":\"2026052704654789\",\"itemType\":\"ITEM2\",\"itemAmount\":500},{\"cartid\":\"202605271098431\",\"itemType\":\"ITEM1\",\"itemAmount\":600}]",
"id": "2",
"event.id": "d44d5a05-c6d5-4881-b04f-73e3caf5b88f",
"event.kind": "BIZ_EVENT",
"dt.openpipeline.source": "/api/v2/bizevents/ingest",
"dt.openpipeline.pipelines": [
"bizevents:pipeline_parse_json_array_5188"
],
"output_array": [
"{\"cartid\":\"202605270404285529\", \"itemType\":\"ITEM1\", \"itemAmount\":100}",
"{\"cartid\":\"20260527040428553\", \"itemType\":\"ITEM2\", \"itemAmount\":200}",
"{\"cartid\":\"20260527040456789\", \"itemType\":\"ITEM1\", \"itemAmount\":300}",
"{\"cartid\":\"202605270404987654\", \"itemType\":\"ITEM3\", \"itemAmount\":400}",
"{\"cartid\":\"2026052704654789\", \"itemType\":\"ITEM2\", \"itemAmount\":500}",
"{\"cartid\":\"202605271098431\", \"itemType\":\"ITEM1\", \"itemAmount\":600}"
],
"openpipeline_array_of_item1": [],
"openpipeline_item1_arraySize": "0",
"openpipeline_item1_cumulativeAmount": null
}
],
"types": [
{
"indexRange": [
0,
1
],
"mappings": {
"timestamp": {
"type": "timestamp"
},
"input_array_in_string": {
"type": "string"
},
"id": {
"type": "string"
},
"event.id": {
"type": "string"
},
"event.kind": {
"type": "string"
},
"dt.openpipeline.source": {
"type": "string"
},
"dt.openpipeline.pipelines": {
"type": "array",
"types": [
{
"indexRange": [
0,
0
],
"mappings": {
"element": {
"type": "string"
}
}
}
]
},
"output_array": {
"type": "array",
"types": [
{
"indexRange": [
0,
5
],
"mappings": {
"element": {
"type": "string"
}
}
}
]
},
"openpipeline_array_of_item1": {
"type": "array",
"types": []
},
"openpipeline_item1_arraySize": {
"type": "long"
},
"openpipeline_item1_cumulativeAmount": {
"type": "double"
}
}
}
]
}
Possible root cause
It seems that PARSE DQL command doesn't give the same result between Notebook and openpipeline
Difference between notebook and openpipeline
//NOTEBOOK
"output_array_in_notebook": [
{
"cartid": "202605270404285529",
"itemType": "ITEM1",
"itemAmount": "100"
},
{
"cartid": "20260527040428553",
"itemType": "ITEM2",
"itemAmount": "200"
},
{
"cartid": "20260527040456789",
"itemType": "ITEM1",
"itemAmount": "300"
},
{
"cartid": "202605270404987654",
"itemType": "ITEM3",
"itemAmount": "400"
},
{
"cartid": "2026052704654789",
"itemType": "ITEM2",
"itemAmount": "500"
},
{
"cartid": "202605271098431",
"itemType": "ITEM1",
"itemAmount": "600"
}
],
// OPENPIPELINE
"output_array": [
"{\"cartid\":\"202605270404285529\", \"itemType\":\"ITEM1\", \"itemAmount\":100}",
"{\"cartid\":\"20260527040428553\", \"itemType\":\"ITEM2\", \"itemAmount\":200}",
"{\"cartid\":\"20260527040456789\", \"itemType\":\"ITEM1\", \"itemAmount\":300}",
"{\"cartid\":\"202605270404987654\", \"itemType\":\"ITEM3\", \"itemAmount\":400}",
"{\"cartid\":\"2026052704654789\", \"itemType\":\"ITEM2\", \"itemAmount\":500}",
"{\"cartid\":\"202605271098431\", \"itemType\":\"ITEM1\", \"itemAmount\":600}"
],
Thank you
Freddy
Featured Posts