Build an AI lead qualification pipeline in Python¶
This notebook demonstrates a complex, multi-stage screening workflow that combines multiple everyrow operations with pandas data transformations.
Use Case: Qualify investment fund leads for a B2B research tools company. The workflow:
- Score funds by "contrarian" research approach (likely to adopt new tools)
- Filter to high-scoring candidates using pandas
- Research team sizes for remaining candidates
- Apply nuanced inclusion logic: include funds with strong research signals OR very small teams
Why this approach? Traditional tools force binary choices. This workflow captures the nuanced mental model: "I want funds that show research-tool-adoption signals, but I'll also include tiny funds where even weak signals matter."
import asyncio
from dotenv import load_dotenv
load_dotenv()
import pandas as pd
from pydantic import BaseModel, Field
from everyrow import create_session
from everyrow.ops import rank, screen
Load Investment Fund Data¶
funds_df = pd.read_csv("../data/investment_funds.csv")
print(f"Loaded {len(funds_df)} funds")
funds_df.head(10)
Stage 1: Rank Funds by Research Tool Adoption Likelihood¶
First, we score each fund on how likely they are to adopt new research tools. This is based on their investment approach and research intensity.
CONTRARIAN_SCORING_TASK = """
Score each investment fund from 0-100 on their likelihood to adopt new research tools.
HIGH scores (70-100) for funds that:
- Emphasize proprietary/primary research
- Mention reading documents, reports, filings manually
- Have research-intensive strategies (fundamental analysis, deep dives)
- Express need for research edge or differentiation
- Smaller teams that need to punch above their weight
MEDIUM scores (40-69) for funds that:
- Do some research but also rely on quantitative/systematic approaches
- Have mixed strategies
LOW scores (0-39) for funds that:
- Are fully systematic/algorithmic with no fundamental research
- Passive/index funds
- Explicitly mention no human research or automated-only approaches
"""
async def stage1_score_funds(session, df):
"""Score funds by research tool adoption likelihood."""
print("Stage 1: Scoring funds by research tool adoption likelihood...")
result = await rank(
session=session,
task=CONTRARIAN_SCORING_TASK,
input=df,
field_name="score",
)
return result.data
Stage 2: Filter Using Pandas¶
Apply a threshold to focus on high-potential leads. We keep funds scoring 50+ for further analysis.
def stage2_filter_by_score(df, threshold=50):
"""Filter to funds above the score threshold."""
print(f"\nStage 2: Filtering to funds with score >= {threshold}...")
filtered = df[df["score"] >= threshold].copy()
print(f" {len(df)} funds -> {len(filtered)} funds after filtering")
print(f" Removed: {len(df) - len(filtered)} low-score funds")
return filtered
Stage 3: Research Team Sizes¶
For the remaining funds, we want to know their team size. Smaller teams are often more accessible and more likely to try new tools.
TEAM_SIZE_TASK = """
Estimate the investment team size for each fund based on the available information.
Look for clues like:
- Explicit mentions of team size ("two-person team", "solo GP")
- AUM relative to strategy complexity
- Website descriptions mentioning analysts, partners, etc.
Provide your best estimate as a number. If a range, use the midpoint.
For very small operations, 1-3 is typical.
For larger funds, 10-50+ is common.
"""
async def stage3_research_team_size(session, df):
"""Research and estimate team sizes."""
print("\nStage 3: Researching team sizes...")
result = await rank(
session=session,
task=TEAM_SIZE_TASK,
input=df,
field_name="team_size_estimate",
)
return result.data
Stage 4: Apply Nuanced Inclusion Logic¶
The final screening applies nuanced logic that captures our actual mental model:
- Include if strong research signals (score >= 70)
- Also include if very small team (<= 5 people), even with weaker signals
This captures the insight that tiny teams are often more accessible and more desperate for research tools, even if their website doesn't explicitly mention research needs.
class InclusionResult(BaseModel):
"""Schema for final inclusion decision."""
include: bool = Field(
description="Whether to include this fund in the final outreach list"
)
inclusion_reason: str = Field(
description="Why this fund was included or excluded"
)
priority: str = Field(
description="high, medium, or low priority for outreach"
)
INCLUSION_TASK = """
Decide whether to include each fund in the final outreach list for a B2B research tools sale.
INCLUDE a fund if EITHER:
1. They have a high research tool adoption score (>= 70) - these are obvious fits
2. They have a very small team (<= 5 people) - small teams are accessible and need tools
PRIORITY levels:
- HIGH: Score >= 70 AND small team - best of both worlds
- MEDIUM: Score >= 70 OR small team (but not both)
- LOW: Included but borderline
EXCLUDE funds that don't meet either criterion.
"""
async def stage4_final_screening(session, df):
"""Apply final inclusion logic."""
print("\nStage 4: Applying final inclusion logic...")
result = await screen(
session=session,
task=INCLUSION_TASK,
input=df,
response_model=InclusionResult,
)
return result.data
Run the Complete Workflow¶
async def run_full_workflow():
"""Execute the complete multi-stage screening workflow."""
async with create_session(name="Multi-Stage Lead Screening") as session:
print(f"Session URL: {session.get_url()}")
print("="*60)
# Stage 1: Score by research tool adoption
scored_df = await stage1_score_funds(session, funds_df)
# Stage 2: Filter by score threshold (pandas)
filtered_df = stage2_filter_by_score(scored_df, threshold=50)
# Stage 3: Research team sizes
with_team_size_df = await stage3_research_team_size(session, filtered_df)
# Stage 4: Final screening with nuanced logic
final_df = await stage4_final_screening(session, with_team_size_df)
return final_df
final_results = await run_full_workflow()
Analyze Final Results¶
# Filter to included funds
included_funds = final_results[final_results["include"] == True].copy()
print(f"\n{'='*60}")
print(f"FINAL RESULTS: {len(included_funds)} funds qualified for outreach")
print(f"{'='*60}\n")
# Show included funds
print("QUALIFIED FUNDS:")
print("-" * 50)
for _, row in included_funds.iterrows():
print(f" {row['fund_name']}")
team_size = row.get('team_size_estimate', 'N/A')
score = row.get('score', 'N/A')
print(f" AUM: ${row['aum_millions']}M | Team: ~{team_size} | Score: {score}")
print()
# Summary statistics
print("\nWORKFLOW SUMMARY:")
print(f" Started with: {len(funds_df)} funds")
print(f" After score filter: {len(final_results)} funds")
print(f" Final qualified leads: {len(included_funds)} funds")
# Export the final list
included_funds.to_csv("qualified_leads.csv", index=False)
print(f"\nExported {len(included_funds)} qualified leads to qualified_leads.csv")
# Display full results table
final_results[["fund_name", "aum_millions", "score", "team_size_estimate", "include"]]