How to Flatten Object Types and Query Arrays in Semi-Structured Nested JSON for Effective Data Extraction
Flattening nested JSON or MongoDB’s BSON or normalizing semi-structured data and writing queries on it for analytics or regular queries, is a common challenge in data processing.
In this article, we’ll explore multiple methods for extracting data from semi-structured JSON into a SQL readable table format, including techniques that use Spark to infer the schema automatically, manually and more.
We’ll also cover various other ways to flatten a semi-structured JSON data including recursive flattening, full flattening into the main table, creating separate tables for array keys.
TL;DR
Multiple Methods: We’ll cover various techniques for effective data extraction:
Flattening first-level keys into columns
Recursively flattening nested keys
Creating separate tables for array types
Using pandas.json_normalize
Flattening in PySpark with automatic schema inference
Real-World Tools: Insights into how tools like Airbyte, HevoData, and Fivetran manage data flattening.
Let’s dive in and simplify your data extraction game!
Method 1: Flatten 1st Level JSON keys into respective columns
For illustration purposes we have taken an example JSON that has 2 levels of nesting. The JSON contains a field that can either be an integer, string or an array of integers or string or objects. We want to "explode" the nested data into a tables format.
Here’s the JSON structure. We will be using this same data throughout this blog.
data = [
{
"id": 1,
"name": "John Doe",
"contact": {
"email": "[email protected]",
"phone": "123-456-7890"
},
"location": {
"city": "Anytown",
"coordinates": {"lat": 40.7128, "long": -74.0060}
},
"projects": [
{
"title": "Project Alpha",
"budget": 100000,
"tasks": [
{"task_name": "Task A", "task_id": 1},
{"task_name": "Task B", "task_id": 2}
]
},
{
"title": "Project Beta",
"budget": 50000,
"tasks": [
{"task_name": "Task C", "task_id": 3}
]
}
]
},
{
"id": 2,
"name": "Jane Smith",
"contact": {
"email": "[email protected]",
"phone": "None"
},
"location": {
"city": "Othertown",
"coordinates": {"lat": 51.5074, "long": -0.1278}
},
"projects": [
{
"title": "Project Gamma",
"budget": 75000,
"tasks": [
{"task_name": "Task D", "task_id": 4}
]
}
]
}
]
We inserted this data into our MongoDB server and now it's ready to be flattened.
Here’s a python script that:
1. Takes your data from mongoDB
2. Ingested it into MySQL with 1 level flattening of top level keys ONLY (we’ll discuss full flattening in coming methods)
Code - GitHub Gist for the Flattening Method 1 Code
Output:
Output with Level 1 flattening:
Array keys remains the same, not flattening.
Objects like:
contact: {
email: '[email protected]', phone: '123-456-7890
' },Become,
contact_email
andcontact_phone
The schema:
Now, you can easily write SQL to query any data you need without worrying about the levels of nested data you have.
If you wish to see the script works internally and how it logically maps, the below debugging messages might be useful to you.
Things to keep in mind:
How well a script handles numeric KEY names (not values).
What happens if:
Data:
{'a_b':{'c':1}, 'a':{'b_c':2}}
Flattened into -
a_b_c
= 1,a_b_c
= 2, this would create ambiguity in column names, hence name your columns well.
In the previous method we talked about just flattening top level keys, now let's flatten all the key value pairs and assign the individual keys.
Method 2: Flatten each nested key into a separate column along with their values
In this method, we will flatten all keys into respective columns. In case if there is some confusion with “...along with their values”, it means that if a top level key is nested, so in a way we can say the inner keys are values of outer keys and as this is a horizontal flattening method, we flatten those “values” too.
Code:
def flatten(dictionary, parent_key=False, separator='.'):
"""
Turn a nested dictionary into a flattened dictionary
:param dictionary: The dictionary to flatten
:param parent_key: The string to prepend to dictionary's keys
:param separator: The string used to separate flattened keys
:return: A flattened dictionary
"""
items = []
for key, value in dictionary.items():
new_key = f"{parent_key}{separator}{key}" if parent_key else key
if isinstance(value, MutableMapping):
if not value:
items.append((new_key, None))
else:
items.extend(flatten(value, new_key, separator).items())
elif isinstance(value, list):
if len(value):
for idx, item in enumerate(value):
items.extend(flatten({str(idx): item}, new_key, separator).items())
else:
items.append((new_key, None))
else:
items.append((new_key, value))
return dict(items)
Replace the flattening logic from Method 1 to use the above logic. This code flattens everything, let me take an example to demonstrate.
We have taken the same dataset as used in method 1
Output table with flattened schema:
Note: The column type here is all TEXT, you can map them to their respective type by identifying the type of data each column is receiving, it is set to TEXT just for illustration purposes.
Cons:
Too many columns: If a key has 1000 different values, it will generate a column for each value which then becomes practically impossible to query, in that case, avoid this method.
Long SQL queries: You might have to SELECT many column names for basic queries.
Pros:
If your data is consistent with similar values for a key (that are not more than 30), this method can be useful to you.
If your use case is around a few nested key values, this approach might help you query data really fast as each column contains just 1 value (instead of columns containing arrays).
Method 3: Flatten keys into columns and have separate table for array types:
This is by far the most logical way to flatten your JSON data (as mentioned in method 1), by splitting arrays into receptive columns. Let’s see how its done.
Strategy:
Flatten top level keys into a main table, including the polymorphic key.
Explode keys with array, struct, into a separate table with relation to main table via Primary Key and Foreign Key Relationship
So, the above Data would look something like:
A. main table
_link | id | name | contact_email | contact_phone | location_city | location_coordinates_lat | location_coordinates_long |
0 | 1 | John Doe | [email protected] | 123-456-7890 | Anytown | 40.7128 | -74.006 |
1 | 2 | Jane Smith | [email protected] | None | Othertown | 51.5074 | -0.1278 |
B. projects_tasks table
_link | _link_projects | _link_main | task_name | task_id |
0.projects.0.tasks.0 | 0.projects.0 | 0 | Task A | 1 |
0.projects.0.tasks.1 | 0.projects.0 | 0 | Task B | 2 |
0.projects.1.tasks.0 | 0.projects.1 | 0 | Task C | 3 |
1.projects.0.tasks.0 | 1.projects.0 | 1 | Task D | 4 |
C. projects table
_link | _link_main | title | budget |
0.projects.0 | 0 | Project Alpha | 100000 |
0.projects.1 | 0 | Project Beta | 50000 |
1.projects.0 | 1 | Project Gamma | 75000 |
Special column _link, _link_main and _link_projects are generated that act as primary key and foreign keys for each array table that got exploded. Now in order to run SQL queries that involve multiple tables, you can easily join them and run your analytics query.
Here’s a code implementation for this using flatterer
Source - MongoDB
Destination - MySQL [can be postgres or any other data warehouse / lakehouse]
The output schema and table:
And the table content will remain the same as shown above. Now, as the data is flattened into respective tables, you can easily write your analytics query.
Method 4: Using Pandas json_normalize
function
This json_normalize
function in pandas
is good for flattening deeply nested JSON objects but can take some time depending upon your JSON
structure. It can handle various types of structures like arrays, objects, and nested fields.
Here's a detailed example that uses most of the key options of json_normalize
.
All Parameters provided:
data: The input JSON object to normalize.
record_path: The path in the nested JSON that contains lists (arrays) to flatten.
meta: The list of fields (nested or otherwise) to include as metadata in the final DataFrame.
meta_prefix: A prefix to add to metadata columns.
record_prefix: A prefix to add to the flattened columns from
record_path
.errors: Whether to raise errors ('
raise
') or ignore ('ignore
').sep: The separator for nested fields in the resulting columns.
max_level: The maximum level of nesting to normalize.
Code Implementation:
import pandas as pd
from pandas import json_normalize
# Example nested JSON
data = {
"id": 123,
"name": "John Doe",
"contact": {
"email": "[email protected]",
"phone": "555-5555"
},
"projects": [
{
"title": "Project Alpha",
"budget": 100000,
"tasks": [
{"name": "Task A", "due_date": "2024-01-01"},
{"name": "Task B", "due_date": "2024-02-01"}
]
},
{
"title": "Project Beta",
"budget": 50000,
"tasks": [
{"name": "Task C", "due_date": "2024-03-01"},
{"name": "Task D", "due_date": "2024-04-01"}
]
}
]
}
# Normalizing the JSON
df = json_normalize(
data['projects'], # Flatten the 'projects' list
record_path='tasks', # Flatten the 'tasks' array inside each project
meta=['title', 'budget'], # Add project-level metadata
meta_prefix='project_', # Prefix for project metadata
record_prefix='task_', # Prefix for task fields
errors='ignore', # Ignore errors for missing fields
max_level=2 # Setting max level of nesting to normalize
)
# Adding the top-level metadata (id, name, contact)
df['meta_id'] = data['id']
df['meta_name'] = data['name']
df['meta_contact_email'] = data['contact']['email']
df['meta_contact_phone'] = data['contact']['phone']
print(df)
Explanation of the parameters used:
record_prefix: Adds the prefix "
task_
" to all columns coming from the tasks array.meta_prefix: Adds the prefix "
meta_
" to all metadata columns to differentiate them from flattened fields.max_level: Restricts how deeply nested fields are processed. In this case, up to 2 levels.
errors: Set to '
ignore
' so that any missing or inconsistent data in the records does not raise an error.First Normalization (json_normalize): I normalized only the
projects
part of the data. Inside each project, I flattened the tasks array. For each task, I includedtitle
andbudget
from the projects level as metadata (meta=['title', 'budget'])
.Second Step (Adding top-level metadata): Since
id, name
, andcontact
are at the top level and are not inside theprojects
structure, they were added manually to the resulting DataFrame.
Output:
Breakdown of Output:
task_name and task_due_date: These columns come from the flattened
tasks
array.meta_id, meta_name, meta_contact_email, meta_contact_phone: These are metadata fields, extracted from the top level of the JSON.
meta_projects: This column contains the original
projects
dictionary for reference.
In this example, json_normalize
takes a deeply nested JSON and flattens specific parts of it while retaining the necessary metadata in the main table isetlf. You can use logic and flatten incoming data from mongoDB and keep ingesting to MySQL as we showed above.
If you just want to flatten the first level keys and keep nested arrays as they are, just have:
# Normalizing the JSON
df = json_normalize( data)
and you’ll be good to go.
Method 5: Flattening Nested JSON in PySpark
There are 2 ways we can approach flattening using spark:
With defining the schema for our data
Let spark infer the schema automatically and flatten it.
A. With defining the schema for our data
Working with nested JSON data in PySpark can get tricky, especially when you want to flatten deeply nested structures like arrays and structs. In this method, we'll talk about a robust solution using PySpark and why explicitly defining schemas is essential when working with such complex data.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import col, explode
spark = SparkSession.builder \
.appName("Robust Flatten JSON") \
.getOrCreate()
# Define the schema explicitly
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("contact", StructType([
StructField("email", StringType(), True),
StructField("phone", StringType(), True)
]), True),
StructField("location", StructType([
StructField("city", StringType(), True),
StructField("coordinates", StructType([
StructField("lat", DoubleType(), True),
StructField("long", DoubleType(), True)
]), True)
]), True),
StructField("projects", ArrayType(StructType([
StructField("title", StringType(), True),
StructField("budget", IntegerType(), True),
StructField("tasks", ArrayType(StructType([
StructField("task_name", StringType(), True),
StructField("task_id", IntegerType(), True)
])), True)
])), True)
])
data = [ … insert method 1 data … ]
df = spark.createDataFrame(data, schema=schema)
# First flatten out the top-level struct fields (contact, location, etc.)
df_final = df.select(
"id",
"name",
"contact.email",
"contact.phone",
"location.city",
"location.coordinates.lat",
"location.coordinates.long",
explode("projects").alias("project")
).select(
"*",
"project.title",
"project.budget",
explode("project.tasks").alias("task")
).select(
"id",
"name",
"email",
"phone",
"city",
col("lat").alias("latitude"),
col("long").alias("longitude"),
col("title").alias("project_title"),
col("budget").alias("project_budget"),
"task.task_name",
"task.task_id"
)
# Show the final flattened DataFrame
df_final.show(truncate=False)
# Stop the Spark session
spark.stop()
Output 1 :
Time Complexity Considerations
While flattening nested data, the operations like explode()
have a linear time complexity relative to the size of the array or the number of rows. However, if you're dealing with very large datasets, defining the schema ahead of time also helps optimize performance. It prevents Spark from having to infer the schema on the fly, which can be a slow and error-prone process for large datasets.
A.1 Flatten only top level fields using PySpark:
If you just want to flatten the top level fields using spark, here’s how you can do it:
# Load JSON into a Spark DataFrame
rdd = spark.sparkContext.parallelize([json_data])
df = spark.read.json(rdd)
flattened_df = df.select(
"id",
"name",
"contact",
"location",
"projects",
)
# Show the final output
flattened_df.show(truncate=False)
# Stop the Spark session
spark.stop()
Output:
B. Let spark infer the JSON schema automatically and flatten it into columns.
In this method, we allow Spark to infer the schema from the input JSON data, eliminating the need to define it explicitly. This approach simplifies handling semi-structured data. The code includes optimizations like parallel processing for efficient handling of millions of rows, intelligent column expansion to handle deeply nested structures, and array explosion for full flattening.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit
from pyspark.sql.types import StructType, ArrayType
from pyspark.sql import DataFrame
# Helper function to flatten structs and arrays fully
def flatten_df(df: DataFrame) -> DataFrame:
complex_fields = True
while complex_fields:
# Identify struct or array columns
complex_fields = [(field.name, field.dataType) for field in df.schema.fields
if isinstance(field.dataType, (StructType, ArrayType))]
for col_name, col_type in complex_fields:
if isinstance(col_type, ArrayType):
# Explode the array column with parallelism
df = df.withColumn(col_name, explode(col(col_name)))
elif isinstance(col_type, StructType):
# Expand the struct column
expanded = [col(f"{col_name}.{field.name}").alias(f"{col_name}_{field.name}")
for field in col_type.fields]
df = df.select("*", *expanded).drop(col_name)
return df
# Optimized Parallel Processing Function
def flatten_large_df(df: DataFrame, num_partitions: int = 200) -> DataFrame:
"""
Function to flatten the DataFrame and handle large datasets with optimized parallelism.
:param df: Input Spark DataFrame to flatten
:param num_partitions: Number of partitions to split the data for parallel processing
:return: Flattened DataFrame
"""
# Repartition the DataFrame to increase parallelism
df = df.repartition(num_partitions)
# Apply flattening
flattened_df = flatten_df(df)
return flattened_df
# Initialize Spark session
spark = SparkSession.builder.appName("Optimized Flatten JSON").getOrCreate()
# Example deeply nested data (millions of rows can be handled efficiently)
data = [ use method 1 JSON data ]
# Create DataFrame with inferred schema
df = spark.read.json(spark.sparkContext.parallelize(data))
# Flatten the DataFrame using optimized parallel processing
df_flattened = flatten_large_df(df)
# Show the final flattened DataFrame
df_flattened.show(truncate=False)
# Stop the Spark session
spark.stop()
Output:
Key Optimizations Included:
Parallel Processing:
- The function
flatten_large_df
uses repartitioning to split the data into a higher number of partitions (num_partitions
), helping with parallel processing for larger datasets, a bit speeding up the flattening process significantly.
- The function
Edge Case Handling: null values or unexpected structures are handled here.
In case if there is a bad record, it will go to
_corrupt_data
column so make sure your JSON is free of invalid keys or values.
How does Airbyte Flatten the Data?
In two ways:
No Flattening
Root Level Flattening Only
Let’s take a look at:
Source JSON:
{
"product_id": 1,
"category": {
"type": "Electronics",
"brand": "Sony"
}
}
No Normalization Output (all in _airbyte_data
):
_airbyte_ab_id | _airbyte_emitted_at | _airbyte_data |
abc123 | 1622135805000 | { "product_id": 1, "category": { "type": "Electronics", "brand": "Sony" } } |
Root-Level Normalization Output:
_airbyte_ab_id | _airbyte_emitted_at | product_id | category |
abc123 | 1622135805000 | 1 | { "type": "Electronics", "brand": "Sony" } |
Here, root-level normalization expands the product_id
field, but category
remains a nested JSON blob. For better flattening, Airbyte integrates with dbt (Data Build Tool), so you can write your queries and achieve custom results.
Arrays too are mapped and stored as JSON at the destination table:
Source Data Schema (from MongoDB) Inferred by Airbyte:
Destination Schema (MySQL):
Airbyte inferred the data correctly and converted them to JSON in the destination table.
Note: This root level flattening is not supported in all destination connectors
How does HevoData Flatten the Data?
Hevo’s flattening transformation breaks down nested JSON into separate columns, including handling arrays. But in our testing, we found out something else:
We ingested sample JSON data (from method 1) to MySQL as our destination.
Hevo flattens nested objects into distinct columns while indexing array values for easier processing in tabular formats, or so they claim to do.
So we went ahead and tested the flattening using Hevo:
Step 1: Login to your Hevo account, connect a source and destination and sync some data.
Step 2: Go to the transformations tab and switch to drag and drop based interface and select Flatten JSON.
Step 3: The following are the filters you need to set:
Apply on all events where “Event Type Equals <your_event_name>”. You can find the event as either your source table name or go to “Test” and click on “Get Sample” to see the name of the event.
Apply on all fields where “Field Name Equals <FIELD>”. The FIELD here is the key you wish to expand on.
Step 4: As claimed by Hevo that they can do horizontal flattening of JSON types, during out testing we found some discrepancies.
Hevo are unable to clearly parse the JSON (Airbyte did ) we synced from MongoDB source and hence could not flatten the data.
The screenshot attached below shows even after transformations, the data looks the same. Notice that “projects” is a JSON and according to Hevo’s Flatten JSON doc, it should do.
Our Data as stored in mongoDB.
Hevo gives us an option to choose the mappings, either replicate the incoming JSON to JSON or as JSON strings and array’s to strings. So we tried using this option to see if the flattening is possible using transformations or not.
So we went ahead with option 2 this time as we had some array fields,
But those array fields got mapped as “varchar” and not normal “arrays” and hence there cannot be any flattening. You can’t even change the schema for a column type from the schema mapper with manual schema settings as Hevo inferred ARRAY as a STRING field.
How does Fivetran Flatten the Data?
Fivetran natively offers the below mentioned transformations using their quickstart data models, or integrating DBT or Coalesce. They support BigQuery, Databricks, Postgres, Redshift and Snowflake only, if you want to use their native transformations, you just be on any of the above mentioned warehouses or else use dbt.
Image Source: Fivetran
JSON Delivery Modes for handling JSON or JSONL files:
Packed Mode:
Loads all JSON data into a single
_data
column.Maintains the original nested structure without flattening.
Useful for retaining full data hierarchy for complex queries.
Unpacked Mode:
Flattens JSON data by one level into individual columns like airbyte.
Automatically infers column data types.
Ideal for creating a simplified, tabular view for easier analysis.
They suggest using the LATERAL FLATTEN
function of Snowflake or any of your other destination’s flattening support (BigQuery) or their native transformations for better JSON flattening requirements.
Conclusion
To wrap things up, flattening nested JSON isn't a one-size-fits-all situation. The best method depends on your data's structure and what you want to do with it. We've gone through a bunch of ways to flatten JSON, from simple to complex, and even looked at how popular tools handle it.
Remember, if your data is super nested and you need serious flattening power, PySpark's your best bet. But for simpler cases, pandas or even basic Python can do the trick.
The key is to understand your data and choose the method that makes your life easier. So, go ahead and experiment with these techniques, and soon you'll be a JSON flattening pro! Happy data wrangling!
I’d love to hear your thoughts about this, so feel free to reach out to me on LinkedIn or respond in the comments below, that’d be great.