Example: Basic Python code converts NDJson file that contains events into a Parquet file which is used to integrate the Amazon S3 integration with Split. To learn more about this integration, refer to the Amazon S3 integration guide.
Environment:
- Python 3.7
- Pandas 1.2.2
- Pyarrow 3.0.0
- ndjson 0.3.1
How to use:
- The code expects the NDJSON file to contain the correct data structure for Split events. Refer to an example record below.
{"environmentId":"029bd160-7e36-11e8-9a1c-0acd31e5aef0",
"trafficTypeId":"e6910420-5c85-11e9-bbc9-12a5cc2af8fe",
"eventTypeId":"agus_event",
"key":"gDxEVkUsd3",
"timestamp":1625088419634,
"value":86.5588664346,
"properties":{"age":"51","country":"argentina","tier":"basic"}}
- Using the code below, be sure to replace the source ndjson file and target parquet files name and path.
- The code assumes the properties are all string values.
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ndjson
##################################
input_file = "sample_ndjson.json"
output_file = "event21.parquet"
##################################
# converts dictionary type into proper structure that when saved to parquet will be interpreted as MapType
def dict2keyvalue(dict):
keyvalues = []
for key in dict.keys():
keyvalues.append([("key", key), ("value", str(dict[key]))])
return keyvalues
properties_type = pa.map_(
pa.string(),
pa.string(),
)
schema = pa.schema(
[ pa.field("environmentId", pa.string()),
pa.field("trafficTypeId", pa.string()),
pa.field("eventTypeId", pa.string()),
pa.field("key", pa.string()),
pa.field("timestamp", pa.int64()),
pa.field("value", pa.float64()),
pa.field("properties", properties_type),
]
)
with open(input_file) as f:
js = ndjson.load(f)
data = pd.DataFrame(js)
data["properties"] = data["properties"].apply(lambda x: dict2keyvalue(x))
data = pa.Table.from_pandas(data, schema)
# Save to parquet file
pq.write_table(data, output_file)
Comments
0 comments
Please sign in to leave a comment.