CRM Duplicate Prevention Schema

Deduplication logic and matching algorithms for maintaining data integrity in automated CRM pipelines and lead management systems.

Explanation

This schema defines a comprehensive deduplication framework for CRM systems, enabling automated detection and resolution of duplicate records across multiple data sources. It implements probabilistic matching algorithms that consider multiple identity signals with configurable weights, ensuring high-accuracy duplicate detection while minimizing false positives.

The system supports both strict deterministic matching (exact email, phone) and fuzzy heuristic matching (name variations, company similarities) to handle real-world data quality issues. Each matching rule produces a confidence score, and records exceeding the threshold are flagged for automated merging or manual review.

Duplicate Detection Schema

{  
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "DuplicateDetectionRule",
  "type": "object",
  "properties": {
    "rule_id": {
      "type": "string",
      "format": "uuid",
      "description": "Unique identifier for this rule"
    },
    "name": {
      "type": "string",
      "description": "Human-readable rule name"
    },
    "description": {
      "type": "string",
      "description": "Detailed explanation of what this rule detects"
    },
    "enabled": {
      "type": "boolean",
      "default": true
    },
    "match_type": {
      "type": "string",
      "enum": ["exact", "fuzzy", "phonetic", "composite"],
      "description": "Type of matching algorithm to apply"
    },
    "fields": {
      "type": "array",
      "items": { "type": "string" },
      "description": "CRM field names to compare"
    },
    "weight": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "description": "Importance weight for this rule in composite scoring"
    },
    "threshold": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "description": "Minimum score to consider records a match"
    },
    "action": {
      "type": "string",
      "enum": ["auto_merge", "flag_review", "block_import"],
      "default": "flag_review"
    },
    "priority": {
      "type": "integer",
      "default": 100,
      "description": "Rule evaluation order (lower = higher priority)"
    },
    "conditions": {
      "type": "object",
      "description": "Additional conditions that must be met",
      "properties": {
        "same_account": { "type": "boolean" },
        "created_within_hours": { "type": "integer" },
        "exclude_deleted": { "type": "boolean", "default": true }
      }
    },
    "normalization": {
      "type": "object",
      "description": "Data normalization rules before comparison",
      "properties": {
        "lowercase": { "type": "boolean" },
        "trim_whitespace": { "type": "boolean", "default": true },
        "remove_special_chars": { "type": "boolean" },
        "standardize_phone_format": { "type": "boolean" }
      }
    }
  }
}

Matching Algorithms

# Comprehensive Duplicate Detection Engine
class DuplicateDetector:
    
    def __init__(self):
        self.rules = self.load_rules()
        self.phonetic_encoder = Soundex()  # or Metaphone, NYSIIS
    
    def detect_duplicates(self, new_record, existing_records):
        """
        Compare new record against existing records
        Returns list of potential duplicates with confidence scores
        """
        potential_matches = []
        
        for existing in existing_records:
            if existing.id == new_record.id:
                continue
            
            total_score = 0
            max_possible = 0
            matched_rules = []
            
            for rule in sorted(self.rules, key=lambda r: r.priority):
                if not rule.enabled:
                    continue
                
                rule_score = self.evaluate_rule(rule, new_record, existing)
                weighted_score = rule_score * rule.weight
                
                total_score += weighted_score
                max_possible += rule.weight
                
                if rule_score >= rule.threshold:
                    matched_rules.append({
                        'rule_id': rule.rule_id,
                        'rule_name': rule.name,
                        'score': rule_score,
                        'threshold': rule.threshold
                    })
            
            # Calculate composite confidence score (0-1)
            if max_possible > 0:
                confidence = min(total_score / max_possible, 1.0)
            else:
                confidence = 0
            
            if confidence >= self.overall_threshold:
                potential_matches.append({
                    'record': existing,
                    'confidence': confidence,
                    'matched_rules': matched_rules
                })
        
        # Sort by confidence (highest first)
        return sorted(potential_matches, 
                     key=lambda x: x['confidence'], 
                     reverse=True)
    
    def evaluate_rule(self, rule, record_a, record_b):
        """
        Evaluate a single matching rule against two records
        Returns score from 0 to 1
        """
        scores = []
        
        for field in rule.fields:
            val_a = self.get_field_value(record_a, field)
            val_b = self.get_field_value(record_b, field)
            
            if val_a is None or val_b is None:
                continue
            
            # Apply normalization
            val_a = self.normalize_value(val_a, rule.normalization)
            val_b = self.normalize_value(val_b, rule.normalization)
            
            # Calculate field-level score based on match type
            if rule.match_type == 'exact':
                field_score = 1.0 if val_a == val_b else 0.0
            
            elif rule.match_type == 'fuzzy':
                field_score = self.fuzzy_string_match(val_a, val_b)
            
            elif rule.match_type == 'phonetic':
                encoded_a = self.phonetic_encoder.encode(val_a)
                encoded_b = self.phonetic_encoder.encode(val_b)
                field_score = 1.0 if encoded_a == encoded_b else 0.0
            
            elif rule.match_type == 'composite':
                field_score = self.composite_match(val_a, val_b)
            
            scores.append(field_score)
        
        # Return average score for all fields in this rule
        return sum(scores) / len(scores) if scores else 0
    
    def fuzzy_string_match(self, str_a, str_b):
        """
        Calculate string similarity using Levenshtein distance
        Returns score from 0 (no match) to 1 (identical)
        """
        if not str_a or not str_b:
            return 0
        
        # Calculate edit distance
        distance = levenshtein_distance(str_a.lower(), 
                                        str_b.lower())
        
        # Normalize to 0-1 range
        max_len = max(len(str_a), len(str_b))
        if max_len == 0:
            return 1
        
        similarity = 1 - (distance / max_len)
        
        # Apply non-linear scaling (more tolerant for long strings)
        if max_len >= 10:
            similarity = similarity ** 0.8
        
        return max(0, min(1, similarity))
    
    def normalize_value(self, value, normalization_rules):
        """
        Apply normalization rules to a value
        """
        if not isinstance(value, str):
            value = str(value)
        
        if normalization_rules:
            if normalization_rules.get('trim_whitespace', True):
                value = value.strip()
            
            if normalization_rules.get('lowercase', False):
                value = value.lower()
            
            if normalization_rules.get('remove_special_chars', False):
                value = re.sub(r'[^a-zA-Z0-9\s]', '', value)
            
            if normalization_rules.get('standardize_phone_format', False):
                value = self.standardize_phone(value)
        
        return value
    
    def standardize_phone(self, phone):
        """
        Convert phone number to E.164 format
        """
        # Remove all non-digit characters
        digits = re.sub(r'\D', '', str(phone))
        
        # Handle different country codes
        if len(digits) == 10:  # US number without country code
            digits = '1' + digits
        elif len(digits) == 11 and digits[0] == '1':
            pass  # Already has country code
        
        return '+' + digits

Field-Specific Matching Rules

# Predefined matching rules for common CRM fields
STANDARD_MATCHING_RULES = [
    {
        "rule_id": "email_exact_match",
        "name": "Email Exact Match",
        "description": "Matches when email addresses are identical (case-insensitive)",
        "match_type": "exact",
        "fields": ["email"],
        "weight": 1.0,
        "threshold": 0.95,
        "action": "auto_merge",
        "priority": 1,
        "normalization": {
            "lowercase": true,
            "trim_whitespace": true
        }
    },
    {
        "rule_id": "phone_exact_match",
        "name": "Phone Number Exact Match",
        "description": "Matches when phone numbers normalize to same value",
        "match_type": "exact",
        "fields": ["phone", "mobile_phone"],
        "weight": 1.0,
        "threshold": 0.95,
        "action": "auto_merge",
        "priority": 1,
        "normalization": {
            "standardize_phone_format": true
        }
    },
    {
        "rule_id": "domain_name_fuzzy",
        "name": "Company Domain + Name Fuzzy",
        "description": "Matches same domain with fuzzy name matching",
        "match_type": "composite",
        "fields": ["company_domain", "company_name"],
        "weight": 0.8,
        "threshold": 0.75,
        "action": "flag_review",
        "priority": 2,
        "conditions": {
            "same_account": false
        }
    },
    {
        "rule_id": "name_phonetic",
        "name": "Name Phonetic Match",
        "description": "Matches similar-sounding names using phonetic encoding",
        "match_type": "phonetic",
        "fields": ["first_name", "last_name"],
        "weight": 0.5,
        "threshold": 0.7,
        "action": "flag_review",
        "priority": 3,
        "conditions": {
            "created_within_hours": 24
        }
    },
    {
        "rule_id": "company_duplicate",
        "name": "Company Name Fuzzy Match",
        "description": "Detects duplicate companies with name variations",
        "match_type": "fuzzy",
        "fields": ["company_name"],
        "weight": 0.6,
        "threshold": 0.8,
        "action": "flag_review",
        "priority": 2,
        "normalization": {
            "lowercase": true,
            "remove_special_chars": true,
            "trim_whitespace": true
        }
    }
]

Merge Strategy & Conflict Resolution

# Automated Merge with Intelligent Conflict Resolution
def merge_duplicate_records(primary_record, duplicate_records):
    """
    Merge duplicate records into a single 'golden record'
    Returns merged record and audit log
    """
    
    merged = primary_record.copy()
    audit_log = {
        'merge_date': datetime.now(),
        'primary_record_id': primary_record.id,
        'merged_record_ids': [r.id for r in duplicate_records],
        'field_resolutions': {},
        'conflicts_detected': [],
        'conflicts_resolved': []
    }
    
    # Field priority rules for conflict resolution
    field_priority = {
        'email': 'non_null',
        'phone': 'non_null',
        'full_name': 'most_complete',
        'company_name': 'most_frequent',
        'lead_source': 'primary_wins',
        'created_at': 'earliest',
        'updated_at': 'latest',
        'qualification_score': 'highest',
        'status': 'highest_priority'
    }
    
    all_records = [primary_record] + duplicate_records
    
    for field in get_all_fields(all_records):
        values = [getattr(r, field, None) for r in all_records 
                 if getattr(r, field, None) is not None]
        
        if not values:
            continue
        
        if field not in field_priority:
            # Default strategy: prefer non-null, then primary
            merged[field] = values[0]
            continue
        
        strategy = field_priority[field]
        
        if strategy == 'non_null':
            merged[field] = values[0]
        
        elif strategy == 'most_complete':
            merged[field] = max(values, key=lambda v: len(str(v)))
        
        elif strategy == 'most_frequent':
            merged[field] = max(set(values), key=values.count)
        
        elif strategy == 'primary_wins':
            merged[field] = getattr(primary_record, field)
        
        elif strategy == 'earliest':
            merged[field] = min(values)
        
        elif strategy == 'latest':
            merged[field] = max(values)
        
        elif strategy == 'highest':
            merged[field] = max(values)
        
        elif strategy == 'highest_priority':
            status_priority = {
                'converted': 5,
                'qualified': 4,
                'contacted': 3,
                'new': 2,
                'unqualified': 1
            }
            merged[field] = max(values, 
                              key=lambda v: status_priority.get(v, 0))
        
        # Check if there were conflicts
        if len(set(str(v) for v in values)) > 1:
            audit_log['conflicts_detected'].append({
                'field': field,
                'values': values,
                'resolution': strategy
            })
    
    # Merge arrays (tags, notes, activities)
    array_fields = ['tags', 'notes', 'activities', 'interactions']
    for field in array_fields:
        all_values = []
        for record in all_records:
            if hasattr(record, field):
                all_values.extend(getattr(record, field))
        
        if all_values:
            # Remove duplicates while preserving order
            merged[field] = list(dict.fromkeys(all_values))
    
    # Combine engagement metrics
    if has_field(all_records, 'engagement_metrics'):
        merged['engagement_metrics'] = combine_engagement_metrics(
            [r.engagement_metrics for r in all_records 
             if hasattr(r, 'engagement_metrics')]
        )
    
    # Mark duplicates as merged
    for dup in duplicate_records:
        dup.status = 'merged'
        dup.merged_into = primary_record.id
        dup.is_active = False
    
    audit_log['merge_complete'] = True
    
    return merged, audit_log

# Conflict Resolution Strategy Configuration
CONFLICT_RESOLUTION = {
    'auto_resolve_fields': [
        'email', 'phone', 'created_at'
    ],
    'flag_for_review_fields': [
        'lead_source', 'status', 'qualification_score'
    ],
    'require_manual_resolution_fields': [
        'account_owner', 'assigned_rep', 'territory'
    ],
    'auto_resolution_confidence_threshold': 0.95,
    'notification_rules': {
        'high_value_conflict': {
            'min_revenue': 1000000,
            'notify_users': ['sales_manager', 'admin']
        }
    }
}

Deduplication Pipeline Architecture

# Real-time deduplication pipeline
class DeduplicationPipeline:
    
    def __init__(self):
        self.detector = DuplicateDetector()
        self.cache = RedisCache()
        self.queue = MessageQueue()
    
    def process_new_record(self, record):
        """
        Process incoming record through deduplication pipeline
        """
        # Step 1: Pre-processing and normalization
        normalized = self.preprocess_record(record)
        
        # Step 2: Generate blocking keys for efficient search
        blocking_keys = self.generate_blocking_keys(normalized)
        
        # Step 3: Find candidate matches using blocking
        candidates = self.find_candidates(blocking_keys)
        
        # Step 4: Run duplicate detection on candidates
        potential_duplicates = self.detector.detect_duplicates(
            normalized, candidates
        )
        
        # Step 5: Determine action based on matches
        if not potential_duplicates:
            # No duplicates found - create new record
            return self.create_record(normalized)
        
        # Step 6: Apply merging logic
        highest_confidence = potential_duplicates[0]
        
        if highest_confidence['confidence'] >= 0.95:
            # Auto-merge high confidence matches
            return self.auto_merge(
                highest_confidence['record'], 
                normalized
            )
        
        elif highest_confidence['confidence'] >= 0.75:
            # Flag for manual review
            return self.flag_for_review(
                highest_confidence['record'],
                normalized,
                potential_duplicates
            )
        
        else:
            # Low confidence - create new record with warning
            return self.create_record_with_warning(
                normalized, 
                potential_duplicates
            )
    
    def generate_blocking_keys(self, record):
        """
        Generate keys to limit search space
        """
        keys = []
        
        # Email domain blocking
        if record.email:
            domain = record.email.split('@')[1].lower()
            keys.append(f"domain:{domain}")
        
        # Company name blocking
        if record.company_name:
            normalized = self.normalize_company_name(
                record.company_name
            )
            keys.append(f"company:{normalized[:5]}")
        
        # Phone prefix blocking
        if record.phone:
            prefix = record.phone[:6]
            keys.append(f"phone_prefix:{prefix}")
        
        # Name n-gram blocking
        if record.full_name:
            bigrams = self.generate_ngrams(
                record.full_name, 2
            )
            keys.extend([f"name_bg:{bg}" for bg in bigrams[:3]])
        
        return keys
    
    def run_batch_deduplication(self, account_id):
        """
        Run deduplication across all records in an account
        """
        all_records = self.get_account_records(account_id)
        processed = set()
        duplicates_found = []
        
        for record in all_records:
            if record.id in processed:
                continue
            
            # Find all duplicates of this record
            potential_duplicates = self.detector.detect_duplicates(
                record,
                [r for r in all_records if r.id != record.id]
            )
            
            if potential_duplicates:
                # Merge all duplicates into primary
                duplicate_records = [d['record'] for d in 
                                   potential_duplicates]
                
                merged_record, audit_log = merge_duplicate_records(
                    record, duplicate_records
                )
                
                duplicates_found.append({
                    'primary': record,
                    'duplicates': duplicate_records,
                    'merged': merged_record,
                    'audit_log': audit_log
                })
                
                processed.update([r.id for r in duplicate_records])
            
            processed.add(record.id)
        
        return duplicates_found
# Blocking key generation for efficient search
def generate_blocking_keys(record):
    """Generate multiple keys to reduce search space"""
    keys = set()
    
    # Exact match blocking
    if record.email:
        keys.add(f"email_exact:{record.email.lower()}")
    
    if record.phone:
        keys.add(f"phone_std:{standardize_phone(record.phone)}")
    
    # Domain-based blocking
    if record.email:
        domain = record.email.split('@')[-1]
        keys.add(f"domain:{domain.lower()}")
    
    # Company name blocking (first 3 chars + length)
    if record.company:
        normalized = re.sub(r'[^a-z]', '', 
                          record.company.lower())
        if len(normalized) >= 3:
            keys.add(f"company_prefix:{normalized[:3]}")
        keys.add(f"company_len:{len(normalized)}")
    
    # Phone area code blocking
    if record.phone:
        area_code = extract_area_code(record.phone)
        if area_code:
            keys.add(f"area_code:{area_code}")
    
    # Location-based blocking
    if record.city:
        keys.add(f"city:{record.city.lower()}")
    
    if record.postal_code:
        keys.add(f"postal_prefix:{record.postal_code[:3]}")
    
    # Name initials blocking
    if record.first_name and record.last_name:
        initials = f"{record.first_name[0]}{record.last_name[0]}".lower()
        keys.add(f"initials:{initials}")
        keys.add(f"fname:{record.first_name.lower()[:3]}")
    
    return list(keys)

# Bloom filter for quick negative matching
from pybloom_live import ScalableBloomFilter

class DuplicateBloomFilter:
    def __init__(self, error_rate=0.001):
        self.filter = ScalableBloomFilter(
            initial_capacity=1000,
            error_rate=error_rate
        )
    
    def add_record(self, record):
        """Add record identifiers to bloom filter"""
        if record.email:
            self.filter.add(f"email:{record.email.lower()}")
        
        if record.phone:
            self.filter.add(f"phone:{standardize_phone(record.phone)}")
    
    def might_contain(self, record):
        """Quick check if record might be a duplicate"""
        if record.email:
            if f"email:{record.email.lower()}" in self.filter:
                return True
        
        if record.phone:
            if f"phone:{standardize_phone(record.phone)}" in self.filter:
                return True
        
        return False

Production Deployment Notes

Download as Markdown

Related Case Studies

View Implementation Case Studies →

Send the Broken Workflow

Get a diagnostic review of your current automation stack and a prioritized implementation plan for agentic AI.

Send the Broken Workflow