Source code for metaclass_registry.core

"""
Generic metaclass infrastructure for automatic plugin registration.

This module provides reusable metaclass infrastructure for Pattern A registry systems
(1:1 class-to-plugin mapping with automatic discovery). It eliminates code duplication
across MicroscopeHandlerMeta, StorageBackendMeta, and ContextProviderMeta.

Pattern Selection Guide:
-----------------------
Use AutoRegisterMeta (Pattern A) when:
- You have a 1:1 mapping between classes and plugins
- Plugins should be automatically discovered and registered
- Registration happens at class definition time
- Simple metadata (just a key and maybe one secondary registry)

Use Service Pattern (Pattern B) when:
- You have many-to-one mapping (multiple items per plugin)
- Complex metadata (FunctionMetadata with 8+ fields)
- Need aggregation across multiple sources
- Examples: Function registry, Format registry

Use Functional Registry (Pattern C) when:
- Simple type-to-handler mappings
- No state needed
- Functional programming style preferred
- Examples: Widget creation registries

Use Manual Registration (Pattern D) when:
- Complex initialization logic required
- Explicit control over registration timing needed
- Very few plugins (< 3)
- Examples: ZMQ servers, Pipeline steps

Architecture:
------------
AutoRegisterMeta uses a configuration-driven approach:
1. RegistryConfig defines registration behavior
2. AutoRegisterMeta applies the configuration during class creation
3. Domain-specific metaclasses provide thin wrappers with their config

This maintains domain-specific features while eliminating duplication.
"""

import importlib
import logging
import threading
from abc import ABCMeta
from dataclasses import dataclass
from enum import Enum
from enum import EnumMeta
from typing import Dict, Type, Optional, Callable, Any

logger = logging.getLogger(__name__)

# Type aliases for clarity
RegistryDict = Dict[str, Type]
KeyExtractor = Callable[[str, Type], str]

# Constants for key sources
PRIMARY_KEY = 'primary'


[docs] class RegistryKeyAttribute(str, Enum): """Common class-attribute names used as registry keys.""" REGISTRY_KEY = "registry_key" STRATEGY_KEY = "strategy_key" STRATEGY_LABEL = "strategy_label" VALUE_TYPE_LABEL = "value_type_label" LAYOUT_KEY = "layout_key"
[docs] class SecondaryRegistryDict(dict): """ Dict for secondary registries that auto-triggers primary registry discovery. When accessed, this dict triggers discovery of the primary registry, which populates both the primary and secondary registries. """
[docs] def __init__(self, primary_registry: 'LazyDiscoveryDict'): super().__init__() self._primary_registry = primary_registry
def _ensure_discovered(self): """Trigger discovery of primary registry (which populates this secondary registry).""" if hasattr(self._primary_registry, '_discover'): self._primary_registry._discover() def __getitem__(self, key): self._ensure_discovered() return super().__getitem__(key) def __contains__(self, key): self._ensure_discovered() return super().__contains__(key) def __iter__(self): self._ensure_discovered() return super().__iter__() def __len__(self): self._ensure_discovered() return super().__len__()
[docs] def keys(self): self._ensure_discovered() return super().keys()
[docs] def values(self): self._ensure_discovered() return super().values()
[docs] def items(self): self._ensure_discovered() return super().items()
[docs] def get(self, key, default=None): self._ensure_discovered() return super().get(key, default)
[docs] class LazyDiscoveryDict(dict): """ Dict that auto-discovers plugins on first access with optional caching. Supports caching discovered plugins to speed up subsequent application starts. Cache is validated against package version and file modification times. Thread-safe: Uses locking to ensure discovery happens only once even when accessed from multiple threads simultaneously. """
[docs] def __init__(self, enable_cache: bool = True): """ Initialize lazy discovery dict. Args: enable_cache: If True, use caching to speed up discovery """ super().__init__() self._base_class = None self._config = None self._discovered = False self._enable_cache = enable_cache self._cache_manager = None self._discovery_lock = threading.RLock() # Reentrant lock for same-thread re-entry self._discovery_package = None # Store for pickling support
_cache_components = None
[docs] @classmethod def cache_components(cls): """Return lazily imported cache components without a module helper.""" if cls._cache_components is None: from metaclass_registry.cache import ( RegistryCacheManager, CacheConfig, serialize_plugin_class, deserialize_plugin_class, get_package_file_mtimes ) cls._cache_components = { 'RegistryCacheManager': RegistryCacheManager, 'CacheConfig': CacheConfig, 'serialize_plugin_class': serialize_plugin_class, 'deserialize_plugin_class': deserialize_plugin_class, 'get_package_file_mtimes': get_package_file_mtimes } return cls._cache_components
def _set_config(self, base_class: Type, config: 'RegistryConfig') -> None: self._base_class = base_class self._config = config self._discovery_package = config.discovery_package # Store for pickling support # Initialize cache manager if caching is enabled if self._enable_cache and config.discovery_package: try: cache_utils = self.cache_components() self._cache_manager = cache_utils['RegistryCacheManager']( cache_name=f"{config.registry_name.replace(' ', '_')}_registry", version_getter=self._get_version, serializer=cache_utils['serialize_plugin_class'], deserializer=cache_utils['deserialize_plugin_class'], config=cache_utils['CacheConfig']( max_age_days=7, check_mtimes=True # Validate file modifications ) ) except Exception as e: logger.debug(f"Failed to initialize cache manager: {e}") self._cache_manager = None def _get_version(self) -> str: """ Get version from the discovery package for cache validation. Returns: Version string or 'unknown' if unable to determine """ try: # Try to get version from the root package if self._discovery_package: root_package = self._discovery_package.split('.')[0] mod = __import__(root_package) return getattr(mod, '__version__', 'unknown') except: pass return "unknown" def _discover(self) -> None: """ Run discovery once, using cache if available. Thread-safe: Discovery happens inside the lock to ensure atomicity. The lock is held for the entire duration to prevent other threads from reading a partially-populated registry. CRITICAL: No fast path check outside the lock! All threads must acquire the lock to ensure they don't read a partially-populated registry. """ # No config = nothing to discover if not self._config or not self._config.discovery_package: return # ALWAYS acquire lock - no fast path to avoid race condition # RLock allows same thread to re-acquire during module imports with self._discovery_lock: # Check if already discovered (inside lock) if self._discovered: return # Mark as discovered to prevent infinite re-entry from same thread # (module imports during discovery might access registry) self._discovered = True # Try to load from cache first if self._cache_manager: try: cached_plugins = self._cache_manager.load_cache() if cached_plugins is not None: # Reconstruct registry from cache self.update(cached_plugins) logger.info( f"✅ Loaded {len(self)} {self._config.registry_name}s from cache" ) return except Exception as e: logger.debug(f"Cache load failed for {self._config.registry_name}: {e}") # Cache miss or disabled - perform full discovery try: pkg = importlib.import_module(self._config.discovery_package) if self._config.discovery_function: self._config.discovery_function( pkg.__path__, f"{self._config.discovery_package}.", self._base_class ) else: from metaclass_registry.discovery import ( discover_registry_classes, discover_registry_classes_recursive ) func = ( discover_registry_classes_recursive if self._config.discovery_recursive else discover_registry_classes ) func(pkg.__path__, f"{self._config.discovery_package}.", self._base_class) logger.debug(f"Discovered {len(self)} {self._config.registry_name}s") # Save to cache if enabled if self._cache_manager: try: cache_utils = self.cache_components() file_mtimes = cache_utils['get_package_file_mtimes']( self._config.discovery_package ) self._cache_manager.save_cache(dict(self), file_mtimes) except Exception as e: logger.debug(f"Failed to save cache for {self._config.registry_name}: {e}") except Exception as e: logger.warning(f"Discovery failed: {e}") # Lock released here - registry is now fully populated and safe to read def __getitem__(self, k): with self._discovery_lock: self._discover() return super().__getitem__(k) def __contains__(self, k): with self._discovery_lock: self._discover() return super().__contains__(k) def __iter__(self): with self._discovery_lock: self._discover() return super().__iter__() def __len__(self): with self._discovery_lock: self._discover() return super().__len__()
[docs] def keys(self): with self._discovery_lock: self._discover() return super().keys()
[docs] def values(self): with self._discovery_lock: self._discover() return super().values()
[docs] def items(self): with self._discovery_lock: self._discover() return super().items()
[docs] def get(self, k, default=None): with self._discovery_lock: self._discover() return super().get(k, default)
[docs] def __getstate__(self): """ Return state for pickling, excluding non-picklable objects. The _discovery_lock and _cache_manager are excluded since they contain non-picklable objects (RLock and potentially closures). """ state = dict(self) # Copy the dict contents state.update({ '_base_class': self._base_class, '_config': self._config, '_discovered': self._discovered, '_enable_cache': self._enable_cache, '_discovery_package': self._discovery_package, # Exclude: _discovery_lock, _cache_manager }) return state
[docs] def __setstate__(self, state): """ Restore state from pickle, recreating non-picklable objects. The _discovery_lock is recreated as a new RLock. The _cache_manager is set to None and will be reinitialized if needed. """ # Restore dict contents for key, value in state.items(): if key.startswith('_'): setattr(self, key, value) else: self[key] = value # Recreate non-picklable objects self._discovery_lock = threading.RLock() self._cache_manager = None # Will be reinitialized if needed
[docs] @dataclass(frozen=True) class SecondaryRegistry: """Configuration for a secondary registry (e.g., metadata handlers).""" registry_dict: RegistryDict key_source: str # 'primary' or attribute name attr_name: str # Attribute to check on the class
[docs] @dataclass(frozen=True) class RegistryFamily: """Declarative registry-family configuration for ``AutoRegisterMeta`` roots. ``AutoRegisterMeta`` historically used low-level class attributes such as ``__registry_key__`` and ``__skip_if_no_key__``. Those attributes remain the compatibility surface, but registry roots can now declare one nominal family object and let the metaclass install the legacy attributes. """ key_attribute: str | RegistryKeyAttribute skip_if_no_key: bool = True registry_name: Optional[str] = None @property def key_attribute_name(self) -> str: """Return the string attribute name consumed by ``AutoRegisterMeta``.""" if isinstance(self.key_attribute, RegistryKeyAttribute): return self.key_attribute.value return self.key_attribute
[docs] def apply_to_class(self, cls: Type, explicit_attrs: dict) -> None: """Install legacy metaclass attributes for compatibility/introspection.""" if '__registry_key__' not in explicit_attrs: cls.__registry_key__ = self.key_attribute_name if '__skip_if_no_key__' not in explicit_attrs: cls.__skip_if_no_key__ = self.skip_if_no_key if self.registry_name is not None and '__registry_name__' not in explicit_attrs: cls.__registry_name__ = self.registry_name
[docs] @dataclass(frozen=True) class RegistryConfig: """ Configuration for automatic class registration behavior. This dataclass encapsulates all the configuration needed for metaclass registration, making the pattern explicit and easy to understand. Attributes: registry_dict: Dictionary to register classes into (e.g., MICROSCOPE_HANDLERS) key_attribute: Name of class attribute containing the registration key (e.g., '_microscope_type', '_backend_type', '_context_type') key_extractor: Optional function to derive key from class name if key_attribute is not set. Signature: (class_name: str, cls: Type) -> str skip_if_no_key: If True, skip registration when key_attribute is None. If False, require either key_attribute or key_extractor. secondary_registries: Optional list of secondary registry configurations log_registration: If True, log debug message when class is registered registry_name: Human-readable name for logging (e.g., 'microscope handler') discovery_package: Optional package name to auto-discover (e.g., 'openhcs.microscopes') discovery_recursive: If True, use recursive discovery (default: False) Examples: # Microscope handlers with name-based key extraction and secondary registry RegistryConfig( registry_dict=MICROSCOPE_HANDLERS, key_attribute='_microscope_type', key_extractor=extract_key_from_handler_suffix, skip_if_no_key=False, secondary_registries=[ SecondaryRegistry( registry_dict=METADATA_HANDLERS, key_source=PRIMARY_KEY, attr_name='_metadata_handler_class' ) ], log_registration=True, registry_name='microscope handler' ) # Storage backends with explicit key and skip-if-none behavior RegistryConfig( registry_dict=STORAGE_BACKENDS, key_attribute='_backend_type', skip_if_no_key=True, registry_name='storage backend' ) # Context providers with simple explicit key RegistryConfig( registry_dict=CONTEXT_PROVIDERS, key_attribute='_context_type', skip_if_no_key=True, registry_name='context provider' ) """ registry_dict: RegistryDict key_attribute: str key_extractor: Optional[KeyExtractor] = None skip_if_no_key: bool = False secondary_registries: Optional[list[SecondaryRegistry]] = None log_registration: bool = True registry_name: str = "plugin" discovery_package: Optional[str] = None # Auto-inferred from base class module if None discovery_recursive: bool = False discovery_function: Optional[Callable] = None # Custom discovery function
[docs] class AutoRegisterMeta(ABCMeta): """ Generic metaclass for automatic plugin registration (Pattern A). This metaclass automatically registers concrete classes in a global registry when they are defined, eliminating the need for manual registration calls. Features: - Skips abstract classes (checks __abstractmethods__) - Supports explicit keys via class attributes - Supports derived keys via key extraction functions - Supports secondary registries (e.g., metadata handlers) - Configurable skip-if-no-key behavior - Debug logging for registration events Usage: # Create domain-specific metaclass class MicroscopeHandlerMeta(AutoRegisterMeta): def __new__(mcs, name, bases, attrs): return super().__new__(mcs, name, bases, attrs, registry_config=_MICROSCOPE_REGISTRY_CONFIG) # Use in class definition class ImageXpressHandler(MicroscopeHandler, metaclass=MicroscopeHandlerMeta): _microscope_type = 'imagexpress' # Optional if key_extractor is provided _metadata_handler_class = ImageXpressMetadata # Optional secondary registration Design Principles: - Explicit configuration over magic behavior - Preserve all domain-specific features - Zero breaking changes to existing code - Easy to understand and debug """
[docs] def __new__(mcs, name: str, bases: tuple, attrs: dict, registry_config: Optional[RegistryConfig] = None): """ Create a new class and register it if appropriate. Args: name: Name of the class being created bases: Base classes attrs: Class attributes dictionary registry_config: Configuration for registration behavior. If None, auto-configures from class attributes or skips registration. Returns: The newly created class """ registry_family = attrs.get('__registry_family__') # Create the class using ABCMeta new_class = super().__new__(mcs, name, bases, attrs) if isinstance(registry_family, RegistryFamily): registry_family.apply_to_class(new_class, attrs) # Auto-configure registry if not provided but class has __registry__ attributes if registry_config is None: registry_config = mcs._auto_configure_registry(new_class, attrs) if registry_config is None: return new_class # No config and no auto-config possible # Set up lazy discovery if registry dict supports it (only once for base class) if isinstance(registry_config.registry_dict, LazyDiscoveryDict) and not registry_config.registry_dict._config: from dataclasses import replace # Auto-infer discovery_package from base class module if not specified config = registry_config if config.discovery_package is None: # Extract package from base class module (e.g., 'openhcs.microscopes.microscope_base' → 'openhcs.microscopes') module_parts = new_class.__module__.rsplit('.', 1) inferred_package = module_parts[0] if len(module_parts) > 1 else new_class.__module__ # Create new config with inferred package config = replace(config, discovery_package=inferred_package) logger.debug(f"Auto-inferred discovery_package='{inferred_package}' from {new_class.__module__}") # Auto-infer discovery_recursive based on package structure # Check if package has subdirectories with __init__.py (indicating nested structure) if config.discovery_package: try: pkg = importlib.import_module(config.discovery_package) if hasattr(pkg, '__path__'): import os has_subpackages = False for path in pkg.__path__: if os.path.isdir(path): # Check if any subdirectories contain __init__.py for entry in os.listdir(path): subdir = os.path.join(path, entry) if os.path.isdir(subdir) and os.path.exists(os.path.join(subdir, '__init__.py')): has_subpackages = True break if has_subpackages: break # Only override if discovery_recursive is still at default (False) # This allows explicit overrides to take precedence if has_subpackages and not config.discovery_recursive: config = replace(config, discovery_recursive=True) logger.debug(f"Auto-inferred discovery_recursive=True for '{config.discovery_package}' (has subpackages)") elif not has_subpackages and config.discovery_recursive: logger.debug(f"Keeping explicit discovery_recursive=True for '{config.discovery_package}' (no subpackages detected)") except Exception as e: logger.debug(f"Failed to auto-infer discovery_recursive: {e}") # Auto-wrap secondary registries with SecondaryRegistryDict if config.secondary_registries: import sys wrapped_secondaries = [] module = sys.modules.get(new_class.__module__) for sec_reg in config.secondary_registries: # Check if secondary registry needs wrapping if isinstance(sec_reg.registry_dict, dict) and not isinstance(sec_reg.registry_dict, SecondaryRegistryDict): # Create a new SecondaryRegistryDict wrapping the primary registry wrapped_dict = SecondaryRegistryDict(registry_config.registry_dict) # Copy any existing entries from the old dict wrapped_dict.update(sec_reg.registry_dict) # Find and update the module global variable if module: for var_name, var_value in vars(module).items(): if var_value is sec_reg.registry_dict: setattr(module, var_name, wrapped_dict) logger.debug(f"Auto-wrapped secondary registry '{var_name}' in {new_class.__module__}") break # Create new SecondaryRegistry with wrapped dict wrapped_sec_reg = SecondaryRegistry( registry_dict=wrapped_dict, key_source=sec_reg.key_source, attr_name=sec_reg.attr_name ) wrapped_secondaries.append(wrapped_sec_reg) else: wrapped_secondaries.append(sec_reg) # Rebuild config with wrapped secondary registries config = replace(config, secondary_registries=wrapped_secondaries) registry_config.registry_dict._set_config(new_class, config) # Only register concrete classes (not abstract base classes) if not bases or getattr(new_class, '__abstractmethods__', None): return new_class # Get or derive the registration key key = mcs._get_registration_key(name, new_class, registry_config) # Handle missing key if key is None: return mcs._handle_missing_key(name, registry_config, new_class) # Register in primary registry mcs._register_class(new_class, key, registry_config) # Handle secondary registrations if registry_config.secondary_registries: mcs._register_secondary(new_class, key, registry_config.secondary_registries) # Log registration if enabled if registry_config.log_registration: logger.debug(f"Auto-registered {name} as '{key}' {registry_config.registry_name}") return new_class
@staticmethod def _get_registration_key(name: str, cls: Type, config: RegistryConfig) -> Optional[str]: """Get the registration key for a class (explicit or derived).""" # Try explicit key first key = getattr(cls, config.key_attribute, None) if key is not None: return key # Try key extractor if provided if config.key_extractor is not None: return config.key_extractor(name, cls) return None @staticmethod def _handle_missing_key(name: str, config: RegistryConfig, new_class: Type) -> Type: """Handle case where no registration key is available.""" if config.skip_if_no_key: if config.log_registration: logger.debug(f"Skipping registration for {name} - no {config.key_attribute}") return new_class # Return the class, just don't register it else: raise ValueError( f"Class {name} must have {config.key_attribute} attribute " f"or provide a key_extractor in registry config" ) @classmethod def _auto_configure_registry(mcs, new_class: Type, attrs: dict) -> Optional[RegistryConfig]: """ Auto-configure registry from metaclass OR base class attributes. Priority: 1. Metaclass attributes (__registry_dict__, __registry_key__ on metaclass) 2. Base class attributes (__registry_key__ on class, auto-create __registry__) 3. Parent class __registry__ (inherit from parent) Returns: RegistryConfig if auto-configuration successful, None otherwise """ # Check if the metaclass has __registry_dict__ attribute (old style) registry_dict = getattr(mcs, '__registry_dict__', None) # If no metaclass registry, check if base class wants auto-creation or inheritance if registry_dict is None: # First check if any parent class has __registry__ (inherit from parent) # This takes priority over creating a new registry for base in new_class.__mro__[1:]: # Skip self if hasattr(base, '__registry__'): registry_dict = base.__registry__ key_attribute = getattr(base, '__registry_key__', None) key_extractor = getattr(base, '__key_extractor__', None) skip_if_no_key = getattr(base, '__skip_if_no_key__', True) secondary_registries = getattr(base, '__secondary_registries__', None) registry_name = getattr(base, '__registry_name__', None) break else: # No parent registry found - check if class explicitly defines a # registry family or __registry_key__ (only create new registry # from the class body, or from an inherited registry protocol # mixin that does not own a registry itself). registry_family = attrs.get('__registry_family__') key_attribute = attrs.get('__registry_key__') if key_attribute is None and isinstance(registry_family, RegistryFamily): key_attribute = registry_family.key_attribute_name inherited_registry_base = None if key_attribute is None: for base in new_class.__mro__[1:]: inherited_key_attribute = getattr(base, '__registry_key__', None) if inherited_key_attribute is not None: inherited_registry_base = base key_attribute = inherited_key_attribute break if key_attribute is not None: # Check if class already provides its own __registry__ dict # (allows opting out of LazyDiscoveryDict) if '__registry__' in attrs: registry_dict = attrs['__registry__'] else: # Auto-create registry dict and store on the class registry_dict = LazyDiscoveryDict() new_class.__registry__ = registry_dict # Get other optional attributes from class. Explicit legacy # class attributes override the family/inherited declaration. key_extractor = attrs.get('__key_extractor__') if key_extractor is None and inherited_registry_base is not None: key_extractor = getattr( inherited_registry_base, '__key_extractor__', None ) if '__skip_if_no_key__' in attrs: skip_if_no_key = attrs['__skip_if_no_key__'] elif isinstance(registry_family, RegistryFamily): skip_if_no_key = registry_family.skip_if_no_key elif inherited_registry_base is not None: skip_if_no_key = getattr( inherited_registry_base, '__skip_if_no_key__', True ) else: skip_if_no_key = True secondary_registries = attrs.get('__secondary_registries__') if ( secondary_registries is None and inherited_registry_base is not None ): secondary_registries = getattr( inherited_registry_base, '__secondary_registries__', None ) if '__registry_name__' in attrs: registry_name = attrs['__registry_name__'] elif isinstance(registry_family, RegistryFamily): registry_name = registry_family.registry_name elif inherited_registry_base is not None: registry_name = getattr( inherited_registry_base, '__registry_name__', None ) else: registry_name = None else: return None # No registry configuration found else: # Old style: get from metaclass key_attribute = getattr(mcs, '__registry_key__', '_registry_key') key_extractor = getattr(mcs, '__key_extractor__', None) skip_if_no_key = getattr(mcs, '__skip_if_no_key__', True) secondary_registries = getattr(mcs, '__secondary_registries__', None) registry_name = getattr(mcs, '__registry_name__', None) # Auto-derive registry name if not provided if registry_name is None: # Derive from class name: "StorageBackend" → "storage backend" clean_name = new_class.__name__ for suffix in ['Base', 'Meta', 'Handler', 'Registry']: if clean_name.endswith(suffix): clean_name = clean_name[:-len(suffix)] break # Convert CamelCase to space-separated lowercase import re registry_name = re.sub(r'([A-Z])', r' \1', clean_name).strip().lower() logger.debug(f"Auto-configured registry for {new_class.__name__}: " f"key_attribute={key_attribute}, registry_name={registry_name}") return RegistryConfig( registry_dict=registry_dict, key_attribute=key_attribute, key_extractor=key_extractor, skip_if_no_key=skip_if_no_key, secondary_registries=secondary_registries, registry_name=registry_name ) @staticmethod def _register_class(cls: Type, key: str, config: RegistryConfig) -> None: """Register class in primary registry.""" config.registry_dict[key] = cls setattr(cls, config.key_attribute, key) @staticmethod def _register_secondary( cls: Type, primary_key: str, secondary_registries: list[SecondaryRegistry] ) -> None: """Handle secondary registry registrations.""" for sec_reg in secondary_registries: value = getattr(cls, sec_reg.attr_name, None) if value is None: continue # Determine the key for secondary registration if sec_reg.key_source == PRIMARY_KEY: secondary_key = primary_key else: secondary_key = getattr(cls, sec_reg.key_source, None) if secondary_key is None: logger.warning( f"Cannot register {sec_reg.attr_name} for {cls.__name__} - " f"no {sec_reg.key_source} attribute" ) continue # Register in secondary registry sec_reg.registry_dict[secondary_key] = value logger.debug(f"Auto-registered {sec_reg.attr_name} from {cls.__name__} as '{secondary_key}'")
[docs] class RegisteredEnumMeta(AutoRegisterMeta, EnumMeta): """Metaclass for enum families that also need AutoRegisterMeta membership."""
# Helper functions for common key extraction patterns
[docs] def extract_key_from_class_name(name: str, cls: Type) -> str: """Use the concrete class name as its registry key.""" return name
[docs] def make_suffix_extractor(suffix: str) -> KeyExtractor: """ Create a key extractor that removes a suffix from class names. Args: suffix: The suffix to remove (e.g., 'Handler', 'Backend') Returns: A key extractor function Examples: extract_handler = make_suffix_extractor('Handler') extract_handler('ImageXpressHandler', cls) -> 'imagexpress' extract_backend = make_suffix_extractor('Backend') extract_backend('DiskStorageBackend', cls) -> 'diskstorage' """ suffix_len = len(suffix) def extractor(name: str, cls: Type) -> str: if name.endswith(suffix): return name[:-suffix_len].lower() return name.lower() return extractor
# Pre-built extractors for common patterns extract_key_from_handler_suffix = make_suffix_extractor('Handler') extract_key_from_backend_suffix = make_suffix_extractor('Backend')