CRM Duplicate Prevention Schema
Deduplication logic and matching algorithms for maintaining data integrity in automated CRM pipelines and lead management systems.
Explanation
This schema defines a comprehensive deduplication framework for CRM systems, enabling automated detection and resolution of duplicate records across multiple data sources. It implements probabilistic matching algorithms that consider multiple identity signals with configurable weights, ensuring high-accuracy duplicate detection while minimizing false positives.
The system supports both strict deterministic matching (exact email, phone) and fuzzy heuristic matching (name variations, company similarities) to handle real-world data quality issues. Each matching rule produces a confidence score, and records exceeding the threshold are flagged for automated merging or manual review.
Duplicate Detection Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DuplicateDetectionRule",
"type": "object",
"properties": {
"rule_id": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for this rule"
},
"name": {
"type": "string",
"description": "Human-readable rule name"
},
"description": {
"type": "string",
"description": "Detailed explanation of what this rule detects"
},
"enabled": {
"type": "boolean",
"default": true
},
"match_type": {
"type": "string",
"enum": ["exact", "fuzzy", "phonetic", "composite"],
"description": "Type of matching algorithm to apply"
},
"fields": {
"type": "array",
"items": { "type": "string" },
"description": "CRM field names to compare"
},
"weight": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Importance weight for this rule in composite scoring"
},
"threshold": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Minimum score to consider records a match"
},
"action": {
"type": "string",
"enum": ["auto_merge", "flag_review", "block_import"],
"default": "flag_review"
},
"priority": {
"type": "integer",
"default": 100,
"description": "Rule evaluation order (lower = higher priority)"
},
"conditions": {
"type": "object",
"description": "Additional conditions that must be met",
"properties": {
"same_account": { "type": "boolean" },
"created_within_hours": { "type": "integer" },
"exclude_deleted": { "type": "boolean", "default": true }
}
},
"normalization": {
"type": "object",
"description": "Data normalization rules before comparison",
"properties": {
"lowercase": { "type": "boolean" },
"trim_whitespace": { "type": "boolean", "default": true },
"remove_special_chars": { "type": "boolean" },
"standardize_phone_format": { "type": "boolean" }
}
}
}
}
Matching Algorithms
# Comprehensive Duplicate Detection Engine
class DuplicateDetector:
def __init__(self):
self.rules = self.load_rules()
self.phonetic_encoder = Soundex() # or Metaphone, NYSIIS
def detect_duplicates(self, new_record, existing_records):
"""
Compare new record against existing records
Returns list of potential duplicates with confidence scores
"""
potential_matches = []
for existing in existing_records:
if existing.id == new_record.id:
continue
total_score = 0
max_possible = 0
matched_rules = []
for rule in sorted(self.rules, key=lambda r: r.priority):
if not rule.enabled:
continue
rule_score = self.evaluate_rule(rule, new_record, existing)
weighted_score = rule_score * rule.weight
total_score += weighted_score
max_possible += rule.weight
if rule_score >= rule.threshold:
matched_rules.append({
'rule_id': rule.rule_id,
'rule_name': rule.name,
'score': rule_score,
'threshold': rule.threshold
})
# Calculate composite confidence score (0-1)
if max_possible > 0:
confidence = min(total_score / max_possible, 1.0)
else:
confidence = 0
if confidence >= self.overall_threshold:
potential_matches.append({
'record': existing,
'confidence': confidence,
'matched_rules': matched_rules
})
# Sort by confidence (highest first)
return sorted(potential_matches,
key=lambda x: x['confidence'],
reverse=True)
def evaluate_rule(self, rule, record_a, record_b):
"""
Evaluate a single matching rule against two records
Returns score from 0 to 1
"""
scores = []
for field in rule.fields:
val_a = self.get_field_value(record_a, field)
val_b = self.get_field_value(record_b, field)
if val_a is None or val_b is None:
continue
# Apply normalization
val_a = self.normalize_value(val_a, rule.normalization)
val_b = self.normalize_value(val_b, rule.normalization)
# Calculate field-level score based on match type
if rule.match_type == 'exact':
field_score = 1.0 if val_a == val_b else 0.0
elif rule.match_type == 'fuzzy':
field_score = self.fuzzy_string_match(val_a, val_b)
elif rule.match_type == 'phonetic':
encoded_a = self.phonetic_encoder.encode(val_a)
encoded_b = self.phonetic_encoder.encode(val_b)
field_score = 1.0 if encoded_a == encoded_b else 0.0
elif rule.match_type == 'composite':
field_score = self.composite_match(val_a, val_b)
scores.append(field_score)
# Return average score for all fields in this rule
return sum(scores) / len(scores) if scores else 0
def fuzzy_string_match(self, str_a, str_b):
"""
Calculate string similarity using Levenshtein distance
Returns score from 0 (no match) to 1 (identical)
"""
if not str_a or not str_b:
return 0
# Calculate edit distance
distance = levenshtein_distance(str_a.lower(),
str_b.lower())
# Normalize to 0-1 range
max_len = max(len(str_a), len(str_b))
if max_len == 0:
return 1
similarity = 1 - (distance / max_len)
# Apply non-linear scaling (more tolerant for long strings)
if max_len >= 10:
similarity = similarity ** 0.8
return max(0, min(1, similarity))
def normalize_value(self, value, normalization_rules):
"""
Apply normalization rules to a value
"""
if not isinstance(value, str):
value = str(value)
if normalization_rules:
if normalization_rules.get('trim_whitespace', True):
value = value.strip()
if normalization_rules.get('lowercase', False):
value = value.lower()
if normalization_rules.get('remove_special_chars', False):
value = re.sub(r'[^a-zA-Z0-9\s]', '', value)
if normalization_rules.get('standardize_phone_format', False):
value = self.standardize_phone(value)
return value
def standardize_phone(self, phone):
"""
Convert phone number to E.164 format
"""
# Remove all non-digit characters
digits = re.sub(r'\D', '', str(phone))
# Handle different country codes
if len(digits) == 10: # US number without country code
digits = '1' + digits
elif len(digits) == 11 and digits[0] == '1':
pass # Already has country code
return '+' + digits
Field-Specific Matching Rules
# Predefined matching rules for common CRM fields
STANDARD_MATCHING_RULES = [
{
"rule_id": "email_exact_match",
"name": "Email Exact Match",
"description": "Matches when email addresses are identical (case-insensitive)",
"match_type": "exact",
"fields": ["email"],
"weight": 1.0,
"threshold": 0.95,
"action": "auto_merge",
"priority": 1,
"normalization": {
"lowercase": true,
"trim_whitespace": true
}
},
{
"rule_id": "phone_exact_match",
"name": "Phone Number Exact Match",
"description": "Matches when phone numbers normalize to same value",
"match_type": "exact",
"fields": ["phone", "mobile_phone"],
"weight": 1.0,
"threshold": 0.95,
"action": "auto_merge",
"priority": 1,
"normalization": {
"standardize_phone_format": true
}
},
{
"rule_id": "domain_name_fuzzy",
"name": "Company Domain + Name Fuzzy",
"description": "Matches same domain with fuzzy name matching",
"match_type": "composite",
"fields": ["company_domain", "company_name"],
"weight": 0.8,
"threshold": 0.75,
"action": "flag_review",
"priority": 2,
"conditions": {
"same_account": false
}
},
{
"rule_id": "name_phonetic",
"name": "Name Phonetic Match",
"description": "Matches similar-sounding names using phonetic encoding",
"match_type": "phonetic",
"fields": ["first_name", "last_name"],
"weight": 0.5,
"threshold": 0.7,
"action": "flag_review",
"priority": 3,
"conditions": {
"created_within_hours": 24
}
},
{
"rule_id": "company_duplicate",
"name": "Company Name Fuzzy Match",
"description": "Detects duplicate companies with name variations",
"match_type": "fuzzy",
"fields": ["company_name"],
"weight": 0.6,
"threshold": 0.8,
"action": "flag_review",
"priority": 2,
"normalization": {
"lowercase": true,
"remove_special_chars": true,
"trim_whitespace": true
}
}
]
Merge Strategy & Conflict Resolution
# Automated Merge with Intelligent Conflict Resolution
def merge_duplicate_records(primary_record, duplicate_records):
"""
Merge duplicate records into a single 'golden record'
Returns merged record and audit log
"""
merged = primary_record.copy()
audit_log = {
'merge_date': datetime.now(),
'primary_record_id': primary_record.id,
'merged_record_ids': [r.id for r in duplicate_records],
'field_resolutions': {},
'conflicts_detected': [],
'conflicts_resolved': []
}
# Field priority rules for conflict resolution
field_priority = {
'email': 'non_null',
'phone': 'non_null',
'full_name': 'most_complete',
'company_name': 'most_frequent',
'lead_source': 'primary_wins',
'created_at': 'earliest',
'updated_at': 'latest',
'qualification_score': 'highest',
'status': 'highest_priority'
}
all_records = [primary_record] + duplicate_records
for field in get_all_fields(all_records):
values = [getattr(r, field, None) for r in all_records
if getattr(r, field, None) is not None]
if not values:
continue
if field not in field_priority:
# Default strategy: prefer non-null, then primary
merged[field] = values[0]
continue
strategy = field_priority[field]
if strategy == 'non_null':
merged[field] = values[0]
elif strategy == 'most_complete':
merged[field] = max(values, key=lambda v: len(str(v)))
elif strategy == 'most_frequent':
merged[field] = max(set(values), key=values.count)
elif strategy == 'primary_wins':
merged[field] = getattr(primary_record, field)
elif strategy == 'earliest':
merged[field] = min(values)
elif strategy == 'latest':
merged[field] = max(values)
elif strategy == 'highest':
merged[field] = max(values)
elif strategy == 'highest_priority':
status_priority = {
'converted': 5,
'qualified': 4,
'contacted': 3,
'new': 2,
'unqualified': 1
}
merged[field] = max(values,
key=lambda v: status_priority.get(v, 0))
# Check if there were conflicts
if len(set(str(v) for v in values)) > 1:
audit_log['conflicts_detected'].append({
'field': field,
'values': values,
'resolution': strategy
})
# Merge arrays (tags, notes, activities)
array_fields = ['tags', 'notes', 'activities', 'interactions']
for field in array_fields:
all_values = []
for record in all_records:
if hasattr(record, field):
all_values.extend(getattr(record, field))
if all_values:
# Remove duplicates while preserving order
merged[field] = list(dict.fromkeys(all_values))
# Combine engagement metrics
if has_field(all_records, 'engagement_metrics'):
merged['engagement_metrics'] = combine_engagement_metrics(
[r.engagement_metrics for r in all_records
if hasattr(r, 'engagement_metrics')]
)
# Mark duplicates as merged
for dup in duplicate_records:
dup.status = 'merged'
dup.merged_into = primary_record.id
dup.is_active = False
audit_log['merge_complete'] = True
return merged, audit_log
# Conflict Resolution Strategy Configuration
CONFLICT_RESOLUTION = {
'auto_resolve_fields': [
'email', 'phone', 'created_at'
],
'flag_for_review_fields': [
'lead_source', 'status', 'qualification_score'
],
'require_manual_resolution_fields': [
'account_owner', 'assigned_rep', 'territory'
],
'auto_resolution_confidence_threshold': 0.95,
'notification_rules': {
'high_value_conflict': {
'min_revenue': 1000000,
'notify_users': ['sales_manager', 'admin']
}
}
}
Deduplication Pipeline Architecture
# Real-time deduplication pipeline
class DeduplicationPipeline:
def __init__(self):
self.detector = DuplicateDetector()
self.cache = RedisCache()
self.queue = MessageQueue()
def process_new_record(self, record):
"""
Process incoming record through deduplication pipeline
"""
# Step 1: Pre-processing and normalization
normalized = self.preprocess_record(record)
# Step 2: Generate blocking keys for efficient search
blocking_keys = self.generate_blocking_keys(normalized)
# Step 3: Find candidate matches using blocking
candidates = self.find_candidates(blocking_keys)
# Step 4: Run duplicate detection on candidates
potential_duplicates = self.detector.detect_duplicates(
normalized, candidates
)
# Step 5: Determine action based on matches
if not potential_duplicates:
# No duplicates found - create new record
return self.create_record(normalized)
# Step 6: Apply merging logic
highest_confidence = potential_duplicates[0]
if highest_confidence['confidence'] >= 0.95:
# Auto-merge high confidence matches
return self.auto_merge(
highest_confidence['record'],
normalized
)
elif highest_confidence['confidence'] >= 0.75:
# Flag for manual review
return self.flag_for_review(
highest_confidence['record'],
normalized,
potential_duplicates
)
else:
# Low confidence - create new record with warning
return self.create_record_with_warning(
normalized,
potential_duplicates
)
def generate_blocking_keys(self, record):
"""
Generate keys to limit search space
"""
keys = []
# Email domain blocking
if record.email:
domain = record.email.split('@')[1].lower()
keys.append(f"domain:{domain}")
# Company name blocking
if record.company_name:
normalized = self.normalize_company_name(
record.company_name
)
keys.append(f"company:{normalized[:5]}")
# Phone prefix blocking
if record.phone:
prefix = record.phone[:6]
keys.append(f"phone_prefix:{prefix}")
# Name n-gram blocking
if record.full_name:
bigrams = self.generate_ngrams(
record.full_name, 2
)
keys.extend([f"name_bg:{bg}" for bg in bigrams[:3]])
return keys
def run_batch_deduplication(self, account_id):
"""
Run deduplication across all records in an account
"""
all_records = self.get_account_records(account_id)
processed = set()
duplicates_found = []
for record in all_records:
if record.id in processed:
continue
# Find all duplicates of this record
potential_duplicates = self.detector.detect_duplicates(
record,
[r for r in all_records if r.id != record.id]
)
if potential_duplicates:
# Merge all duplicates into primary
duplicate_records = [d['record'] for d in
potential_duplicates]
merged_record, audit_log = merge_duplicate_records(
record, duplicate_records
)
duplicates_found.append({
'primary': record,
'duplicates': duplicate_records,
'merged': merged_record,
'audit_log': audit_log
})
processed.update([r.id for r in duplicate_records])
processed.add(record.id)
return duplicates_found
# Blocking key generation for efficient search
def generate_blocking_keys(record):
"""Generate multiple keys to reduce search space"""
keys = set()
# Exact match blocking
if record.email:
keys.add(f"email_exact:{record.email.lower()}")
if record.phone:
keys.add(f"phone_std:{standardize_phone(record.phone)}")
# Domain-based blocking
if record.email:
domain = record.email.split('@')[-1]
keys.add(f"domain:{domain.lower()}")
# Company name blocking (first 3 chars + length)
if record.company:
normalized = re.sub(r'[^a-z]', '',
record.company.lower())
if len(normalized) >= 3:
keys.add(f"company_prefix:{normalized[:3]}")
keys.add(f"company_len:{len(normalized)}")
# Phone area code blocking
if record.phone:
area_code = extract_area_code(record.phone)
if area_code:
keys.add(f"area_code:{area_code}")
# Location-based blocking
if record.city:
keys.add(f"city:{record.city.lower()}")
if record.postal_code:
keys.add(f"postal_prefix:{record.postal_code[:3]}")
# Name initials blocking
if record.first_name and record.last_name:
initials = f"{record.first_name[0]}{record.last_name[0]}".lower()
keys.add(f"initials:{initials}")
keys.add(f"fname:{record.first_name.lower()[:3]}")
return list(keys)
# Bloom filter for quick negative matching
from pybloom_live import ScalableBloomFilter
class DuplicateBloomFilter:
def __init__(self, error_rate=0.001):
self.filter = ScalableBloomFilter(
initial_capacity=1000,
error_rate=error_rate
)
def add_record(self, record):
"""Add record identifiers to bloom filter"""
if record.email:
self.filter.add(f"email:{record.email.lower()}")
if record.phone:
self.filter.add(f"phone:{standardize_phone(record.phone)}")
def might_contain(self, record):
"""Quick check if record might be a duplicate"""
if record.email:
if f"email:{record.email.lower()}" in self.filter:
return True
if record.phone:
if f"phone:{standardize_phone(record.phone)}" in self.filter:
return True
return False
Production Deployment Notes
- Indexing Strategy: Create database indexes on email, phone, company_domain, and normalized name fields to speed up duplicate detection queries.
- Caching Layer: Implement Redis caching for frequently accessed records and duplicate check results to reduce database load.
- Background Processing: Run batch deduplication as background jobs during off-peak hours. Use real-time checks only for incoming records.
- Data Privacy: Hash sensitive identifiers (email, phone) when using bloom filters or external matching services to avoid exposing PII.
- Audit Trail: Log all merge operations with before/after snapshots to enable rollback in case of errors.
- Rate Limiting: Throttle deduplication API calls to prevent overwhelming CRM systems during large imports.
- Data Freshness: Re-run deduplication periodically (daily/weekly) as new data arrives to catch newly created duplicates.
- False Positive Monitoring: Track merge accuracy and adjust weights/thresholds based on manual review outcomes.
Related Case Studies
View Implementation Case Studies →Send the Broken Workflow
Get a diagnostic review of your current automation stack and a prioritized implementation plan for agentic AI.
Send the Broken Workflow