Building a Data Cleaning Agent with LangGraph: A Practical Example

Dirty data can be a nightmare—missing values, inconsistent formats, or typos can derail your analysis. Imagine an AI that automatically cleans your data, validates its quality, and retries until it’s spotless. LangGraph, a dynamic library from the LangChain team, makes this possible with its stateful, graph-based workflows. In this beginner-friendly guide, we’ll walk you through building a data cleaning agent using LangGraph that processes a dataset, applies cleaning rules, and ensures data quality through iterative checks. With clear code examples, a conversational tone, and practical steps, you’ll create an AI-powered data cleaner, even if you’re new to coding!


What is a Data Cleaning Agent in LangGraph?

A data cleaning agent in LangGraph is an AI application designed to:

  • Accept a dataset (e.g., a list of records with potential errors).
  • Apply cleaning rules (e.g., fill missing values, standardize formats) using a language model (like those from OpenAI).
  • Validate the cleaned data’s quality (e.g., check for consistency).
  • Store the cleaning history to track changes.
  • Loop back to refine cleaning if issues persist, up to a set number of attempts.

LangGraph’s nodes (tasks), edges (connections), and state (shared data) enable a flexible workflow that adapts to data quality needs, making it ideal for iterative data processing.

This example demonstrates LangGraph’s ability to handle multi-step, dynamic tasks and can be extended with tools or custom logic. To get started with LangGraph, see Introduction to LangGraph.


What You’ll Build

Our data cleaning agent will: 1. Take a dataset (e.g., a list of customer records with names and emails). 2. Store the dataset and cleaning history in the state. 3. Clean the data using an AI model to fix missing values, typos, or formats. 4. Check if the cleaned data meets quality criteria (simulated for this example). 5. Loop back to refine cleaning if issues remain, up to three attempts. 6. End when the data is clean or attempts are exhausted.

We’ll use LangGraph for the workflow and LangChain for memory, AI, and prompt management.


Prerequisites

Before starting, ensure you have:

  • Python 3.8+: Installed and verified with python --version.
  • LangGraph and LangChain: Installed via pip.
  • OpenAI API Key: For the language model (or use a free model from Hugging Face).
  • Virtual Environment: To manage dependencies.

Install the required packages:

pip install langgraph langchain langchain-openai python-dotenv

Set up your OpenAI API key in a .env file:

echo "OPENAI_API_KEY=your-api-key-here" > .env

For setup details, see Install and Setup and Security and API Keys.


Building the Data Cleaning Agent

Let’s create a LangGraph workflow for the data cleaning agent. We’ll define the state, nodes, edges, and graph, then run it to clean a sample dataset.

Step 1: Define the State

The state holds the dataset, cleaned data, quality status, cleaning history, and attempt count to manage iterations.

from typing import TypedDict, List
from langchain_core.messages import HumanMessage, AIMessage

class State(TypedDict):
    dataset: List[dict]         # Input dataset (e.g., list of records)
    cleaned_data: List[dict]    # Cleaned dataset
    is_clean: bool              # True if data meets quality criteria
    cleaning_history: List[str] # List of cleaning actions/messages
    attempt_count: int          # Number of cleaning attempts

The cleaning_history tracks changes for context, and attempt_count prevents infinite loops. Learn more at State Management.

Step 2: Create Nodes

We’ll use four nodes:

  • process_dataset: Stores the dataset and initializes the history.
  • clean_data: Applies cleaning rules using an AI model.
  • check_quality: Evaluates if the cleaned data is valid (simulated).
  • decide_next: Decides whether to end or retry.
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
import logging
import json

# Setup logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Node 1: Process dataset
def process_dataset(state: State) -> State:
    logger.info("Processing dataset")
    if not state["dataset"]:
        logger.error("Empty dataset")
        raise ValueError("Dataset is required")
    state["cleaning_history"].append(f"Received dataset: {json.dumps(state['dataset'][:2])}...")
    state["attempt_count"] = 0
    logger.debug(f"Initial state: {state}")
    return state

# Node 2: Clean data
def clean_data(state: State) -> State:
    logger.info("Cleaning data")
    try:
        llm = ChatOpenAI(model="gpt-3.5-turbo")
        template = PromptTemplate(
            input_variables=["dataset", "history"],
            template="Dataset: {dataset}\nCleaning history: {history}\nClean the dataset by filling missing values, fixing typos, and standardizing email formats. Return a JSON list of cleaned records."
        )
        history_str = "\n".join(state["cleaning_history"])
        chain = template | llm
        cleaned_json = chain.invoke({
            "dataset": json.dumps(state["dataset"]),
            "history": history_str
        }).content
        cleaned_data = json.loads(cleaned_json)
        state["cleaned_data"] = cleaned_data
        state["cleaning_history"].append(f"Applied cleaning: {cleaned_json[:100]}...")
        state["attempt_count"] += 1
        logger.debug(f"Cleaned data: {cleaned_data}")
    except Exception as e:
        logger.error(f"Cleaning error: {str(e)}")
        state["cleaned_data"] = state["dataset"]
        state["cleaning_history"].append(f"Error during cleaning: {str(e)}")
    return state

# Node 3: Check data quality (simulated)
def check_quality(state: State) -> State:
    logger.info("Checking data quality")
    # Simulate quality check: ensure no missing values and valid emails
    is_clean = True
    for record in state["cleaned_data"]:
        if not record.get("name") or not record.get("email"):
            is_clean = False
            break
        if "@" not in record["email"] or "." not in record["email"]:
            is_clean = False
            break
    state["is_clean"] = is_clean
    if not is_clean:
        state["cleaning_history"].append("Quality check failed: Missing values or invalid emails detected")
    logger.debug(f"Clean: {state['is_clean']}")
    return state

# Node 4: Decide next step
def decide_next(state: State) -> str:
    if state["is_clean"] or state["attempt_count"] >= 3:
        logger.info("Ending workflow: clean or max attempts reached")
        return "end"
    logger.info("Looping back to refine cleaning")
    return "clean_data"
  • process_dataset: Validates the dataset, adds it to history, and initializes attempt_count.
  • clean_data: Uses the AI to clean the dataset (fix missing values, typos, email formats) and updates the state.
  • check_quality: Simulates a quality check by ensuring no missing values and valid emails.
  • decide_next: Decides to end or retry based on quality or attempts.

For AI integration, see OpenAI Integration.

Step 3: Define Edges

The workflow flows as follows:

  • Direct Edges: From process_dataset to clean_data, then to check_quality.
  • Conditional Edge: From check_quality, either end or loop back to clean_data.

Step 4: Build the Workflow

The graph connects nodes and edges:

from langgraph.graph import StateGraph, END

# Build the graph
graph = StateGraph(State)
graph.add_node("process_dataset", process_dataset)
graph.add_node("clean_data", clean_data)
graph.add_node("check_quality", check_quality)
graph.add_edge("process_dataset", "clean_data")
graph.add_edge("clean_data", "check_quality")
graph.add_conditional_edges("check_quality", decide_next, {
    "end": END,
    "clean_data": "clean_data"
})
graph.set_entry_point("process_dataset")

# Compile the graph
app = graph.compile()

Step 5: Run the Data Cleaning Agent

Test the agent with a sample dataset:

from dotenv import load_dotenv
import os

load_dotenv()

# Sample dataset with issues
dataset = [
    {"name": "John Doe", "email": "johndoe@example.com"},
    {"name": "", "email": "jane.doe@exampl.com"},
    {"name": "Bob Smith", "email": "bob@invalid"}
]

# Run the workflow
try:
    result = app.invoke({
        "dataset": dataset,
        "cleaned_data": [],
        "is_clean": False,
        "cleaning_history": [],
        "attempt_count": 0
    })
    print("Cleaned Data:", result["cleaned_data"])
    print("Cleaning History:", result["cleaning_history"])
except Exception as e:
    logger.error(f"Workflow error: {str(e)}")

Example Output:

Cleaned Data: [
    {"name": "John Doe", "email": "johndoe@example.com"},
    {"name": "Jane Doe", "email": "jane.doe@example.com"},
    {"name": "Bob Smith", "email": "bob.smith@example.com"}
]
Cleaning History: [
    "Received dataset: [{\"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}, {\"name\": \"\", \"email\": \"jane.doe@exampl.com\"}]...",
    "Applied cleaning: [{\"name\": \"John Doe\", \"email\": \"johndoe@example.com\"}, {\"name\": \"Jane Doe\", \"email\": \"jane.doe@example.com\"}]..."
]

Step 6: Simulate an Interactive Cleaning Session

To make the agent interactive, create a loop for multiple dataset submissions:

# Initialize state
state = {
    "dataset": [],
    "cleaned_data": [],
    "is_clean": False,
    "cleaning_history": [],
    "attempt_count": 0
}

# Interactive loop
print("Welcome to the Data Cleaning Agent! Submit your dataset (JSON string) or type 'exit' to quit.")
while True:
    user_input = input("Your dataset (or 'exit'): ")
    if user_input.lower() in ["exit", "quit"]:
        break
    try:
        state["dataset"] = json.loads(user_input)
        result = app.invoke(state)
        print("Cleaned Data:", result["cleaned_data"])
        if result["is_clean"]:
            print("Data is clean! Submit a new dataset or exit.")
        elif result["attempt_count"] >= 3:
            print("Max attempts reached. Please revise and resubmit.")
        state = result  # Update state with new history
    except Exception as e:
        print(f"Error: {str(e)}")

Example Interaction:

Welcome to the Data Cleaning Agent! Submit your dataset (JSON string) or type 'exit' to quit.
Your dataset: [{"name": "John Doe", "email": "johndoe@example.com"}, {"name": "", "email": "jane.doe@exampl.com"}]
Cleaned Data: [{"name": "John Doe", "email": "johndoe@example.com"}, {"name": "Jane Doe", "email": "jane.doe@example.com"}]
Data is clean! Submit a new dataset or exit.
Your dataset: exit

What’s Happening?

  • The state persists the cleaning_history, enabling context-aware cleaning.
  • Nodes process the dataset, apply cleaning, check quality, and decide next steps.
  • Edges create a flow that loops back if data isn’t clean, up to three attempts.
  • The workflow is robust, with logging and error handling for reliability.

For more on dynamic flows, see Looping and Branching.


Debugging Common Issues

If the agent encounters issues, try these debugging tips:

  • No Cleaning: Verify the OPENAI_API_KEY is set. See Security and API Keys.
  • Infinite Loop: Check attempt_count in decide_next to ensure the loop limit is enforced. See Graph Debugging.
  • Invalid JSON: Log cleaned_data in clean_data to confirm the AI returns valid JSON.
  • Poor Quality Check: Adjust criteria in check_quality or refine the prompt in clean_data.

Enhancing the Data Cleaning Agent

Extend the agent with LangChain features:

For example, add a node to validate emails with a custom tool or fetch data standards with Web Research Chain.

To deploy the agent as an API, see Deploying Graphs.


Best Practices for Data Cleaning Agents

  • Focused Nodes: Each node should handle one task (e.g., input, cleaning, quality check). See Workflow Design.
  • Robust State: Validate state data to avoid errors. Check State Management.
  • Clear Logging: Use logging to trace issues. See Graph Debugging.
  • Limit Retries: Cap attempts to prevent endless loops. Check Looping and Branching.
  • Test Scenarios: Try various datasets to ensure robust cleaning. See Best Practices.

Conclusion

Building a data cleaning agent with LangGraph is a powerful way to leverage stateful, graph-based workflows for real-world data processing. By structuring the cleaning process with nodes, edges, and a persistent state, you’ve created an AI that transforms messy data into clean, usable records with intelligence and adaptability. This example is a foundation for more advanced agents with tools, dynamic decisions, or cloud deployment.

To begin, follow Install and Setup and try this data cleaning agent. For more, explore Core Concepts or related projects like Customer Support Example. For inspiration, check real-world applications at Best LangGraph Uses. With LangGraph, your data cleaning agent is ready to tidy up and make data sparkle!

External Resources: