Source code for metaclass_registry.core
"""
Generic metaclass infrastructure for automatic plugin registration.
This module provides reusable metaclass infrastructure for Pattern A registry systems
(1:1 class-to-plugin mapping with automatic discovery). It eliminates code duplication
across MicroscopeHandlerMeta, StorageBackendMeta, and ContextProviderMeta.
Pattern Selection Guide:
-----------------------
Use AutoRegisterMeta (Pattern A) when:
- You have a 1:1 mapping between classes and plugins
- Plugins should be automatically discovered and registered
- Registration happens at class definition time
- Simple metadata (just a key and maybe one secondary registry)
Use Service Pattern (Pattern B) when:
- You have many-to-one mapping (multiple items per plugin)
- Complex metadata (FunctionMetadata with 8+ fields)
- Need aggregation across multiple sources
- Examples: Function registry, Format registry
Use Functional Registry (Pattern C) when:
- Simple type-to-handler mappings
- No state needed
- Functional programming style preferred
- Examples: Widget creation registries
Use Manual Registration (Pattern D) when:
- Complex initialization logic required
- Explicit control over registration timing needed
- Very few plugins (< 3)
- Examples: ZMQ servers, Pipeline steps
Architecture:
------------
AutoRegisterMeta uses a configuration-driven approach:
1. RegistryConfig defines registration behavior
2. AutoRegisterMeta applies the configuration during class creation
3. Domain-specific metaclasses provide thin wrappers with their config
This maintains domain-specific features while eliminating duplication.
"""
import importlib
import logging
import threading
from abc import ABCMeta
from dataclasses import dataclass
from enum import Enum
from enum import EnumMeta
from typing import Dict, Type, Optional, Callable, Any
logger = logging.getLogger(__name__)
# Type aliases for clarity
RegistryDict = Dict[str, Type]
KeyExtractor = Callable[[str, Type], str]
# Constants for key sources
PRIMARY_KEY = 'primary'
[docs]
class RegistryKeyAttribute(str, Enum):
"""Common class-attribute names used as registry keys."""
REGISTRY_KEY = "registry_key"
STRATEGY_KEY = "strategy_key"
STRATEGY_LABEL = "strategy_label"
VALUE_TYPE_LABEL = "value_type_label"
LAYOUT_KEY = "layout_key"
[docs]
class SecondaryRegistryDict(dict):
"""
Dict for secondary registries that auto-triggers primary registry discovery.
When accessed, this dict triggers discovery of the primary registry,
which populates both the primary and secondary registries.
"""
[docs]
def __init__(self, primary_registry: 'LazyDiscoveryDict'):
super().__init__()
self._primary_registry = primary_registry
def _ensure_discovered(self):
"""Trigger discovery of primary registry (which populates this secondary registry)."""
if hasattr(self._primary_registry, '_discover'):
self._primary_registry._discover()
def __getitem__(self, key):
self._ensure_discovered()
return super().__getitem__(key)
def __contains__(self, key):
self._ensure_discovered()
return super().__contains__(key)
def __iter__(self):
self._ensure_discovered()
return super().__iter__()
def __len__(self):
self._ensure_discovered()
return super().__len__()
[docs]
class LazyDiscoveryDict(dict):
"""
Dict that auto-discovers plugins on first access with optional caching.
Supports caching discovered plugins to speed up subsequent application starts.
Cache is validated against package version and file modification times.
Thread-safe: Uses locking to ensure discovery happens only once
even when accessed from multiple threads simultaneously.
"""
[docs]
def __init__(self, enable_cache: bool = True):
"""
Initialize lazy discovery dict.
Args:
enable_cache: If True, use caching to speed up discovery
"""
super().__init__()
self._base_class = None
self._config = None
self._discovered = False
self._enable_cache = enable_cache
self._cache_manager = None
self._discovery_lock = threading.RLock() # Reentrant lock for same-thread re-entry
self._discovery_package = None # Store for pickling support
_cache_components = None
[docs]
@classmethod
def cache_components(cls):
"""Return lazily imported cache components without a module helper."""
if cls._cache_components is None:
from metaclass_registry.cache import (
RegistryCacheManager,
CacheConfig,
serialize_plugin_class,
deserialize_plugin_class,
get_package_file_mtimes
)
cls._cache_components = {
'RegistryCacheManager': RegistryCacheManager,
'CacheConfig': CacheConfig,
'serialize_plugin_class': serialize_plugin_class,
'deserialize_plugin_class': deserialize_plugin_class,
'get_package_file_mtimes': get_package_file_mtimes
}
return cls._cache_components
def _set_config(self, base_class: Type, config: 'RegistryConfig') -> None:
self._base_class = base_class
self._config = config
self._discovery_package = config.discovery_package # Store for pickling support
# Initialize cache manager if caching is enabled
if self._enable_cache and config.discovery_package:
try:
cache_utils = self.cache_components()
self._cache_manager = cache_utils['RegistryCacheManager'](
cache_name=f"{config.registry_name.replace(' ', '_')}_registry",
version_getter=self._get_version,
serializer=cache_utils['serialize_plugin_class'],
deserializer=cache_utils['deserialize_plugin_class'],
config=cache_utils['CacheConfig'](
max_age_days=7,
check_mtimes=True # Validate file modifications
)
)
except Exception as e:
logger.debug(f"Failed to initialize cache manager: {e}")
self._cache_manager = None
def _get_version(self) -> str:
"""
Get version from the discovery package for cache validation.
Returns:
Version string or 'unknown' if unable to determine
"""
try:
# Try to get version from the root package
if self._discovery_package:
root_package = self._discovery_package.split('.')[0]
mod = __import__(root_package)
return getattr(mod, '__version__', 'unknown')
except:
pass
return "unknown"
def _discover(self) -> None:
"""
Run discovery once, using cache if available.
Thread-safe: Discovery happens inside the lock to ensure atomicity.
The lock is held for the entire duration to prevent other threads
from reading a partially-populated registry.
CRITICAL: No fast path check outside the lock! All threads must acquire
the lock to ensure they don't read a partially-populated registry.
"""
# No config = nothing to discover
if not self._config or not self._config.discovery_package:
return
# ALWAYS acquire lock - no fast path to avoid race condition
# RLock allows same thread to re-acquire during module imports
with self._discovery_lock:
# Check if already discovered (inside lock)
if self._discovered:
return
# Mark as discovered to prevent infinite re-entry from same thread
# (module imports during discovery might access registry)
self._discovered = True
# Try to load from cache first
if self._cache_manager:
try:
cached_plugins = self._cache_manager.load_cache()
if cached_plugins is not None:
# Reconstruct registry from cache
self.update(cached_plugins)
logger.info(
f"✅ Loaded {len(self)} {self._config.registry_name}s from cache"
)
return
except Exception as e:
logger.debug(f"Cache load failed for {self._config.registry_name}: {e}")
# Cache miss or disabled - perform full discovery
try:
pkg = importlib.import_module(self._config.discovery_package)
if self._config.discovery_function:
self._config.discovery_function(
pkg.__path__,
f"{self._config.discovery_package}.",
self._base_class
)
else:
from metaclass_registry.discovery import (
discover_registry_classes,
discover_registry_classes_recursive
)
func = (
discover_registry_classes_recursive
if self._config.discovery_recursive
else discover_registry_classes
)
func(pkg.__path__, f"{self._config.discovery_package}.", self._base_class)
logger.debug(f"Discovered {len(self)} {self._config.registry_name}s")
# Save to cache if enabled
if self._cache_manager:
try:
cache_utils = self.cache_components()
file_mtimes = cache_utils['get_package_file_mtimes'](
self._config.discovery_package
)
self._cache_manager.save_cache(dict(self), file_mtimes)
except Exception as e:
logger.debug(f"Failed to save cache for {self._config.registry_name}: {e}")
except Exception as e:
logger.warning(f"Discovery failed: {e}")
# Lock released here - registry is now fully populated and safe to read
def __getitem__(self, k):
with self._discovery_lock:
self._discover()
return super().__getitem__(k)
def __contains__(self, k):
with self._discovery_lock:
self._discover()
return super().__contains__(k)
def __iter__(self):
with self._discovery_lock:
self._discover()
return super().__iter__()
def __len__(self):
with self._discovery_lock:
self._discover()
return super().__len__()
[docs]
def get(self, k, default=None):
with self._discovery_lock:
self._discover()
return super().get(k, default)
[docs]
def __getstate__(self):
"""
Return state for pickling, excluding non-picklable objects.
The _discovery_lock and _cache_manager are excluded since they contain
non-picklable objects (RLock and potentially closures).
"""
state = dict(self) # Copy the dict contents
state.update({
'_base_class': self._base_class,
'_config': self._config,
'_discovered': self._discovered,
'_enable_cache': self._enable_cache,
'_discovery_package': self._discovery_package,
# Exclude: _discovery_lock, _cache_manager
})
return state
[docs]
def __setstate__(self, state):
"""
Restore state from pickle, recreating non-picklable objects.
The _discovery_lock is recreated as a new RLock.
The _cache_manager is set to None and will be reinitialized if needed.
"""
# Restore dict contents
for key, value in state.items():
if key.startswith('_'):
setattr(self, key, value)
else:
self[key] = value
# Recreate non-picklable objects
self._discovery_lock = threading.RLock()
self._cache_manager = None # Will be reinitialized if needed
[docs]
@dataclass(frozen=True)
class SecondaryRegistry:
"""Configuration for a secondary registry (e.g., metadata handlers)."""
registry_dict: RegistryDict
key_source: str # 'primary' or attribute name
attr_name: str # Attribute to check on the class
[docs]
@dataclass(frozen=True)
class RegistryFamily:
"""Declarative registry-family configuration for ``AutoRegisterMeta`` roots.
``AutoRegisterMeta`` historically used low-level class attributes such as
``__registry_key__`` and ``__skip_if_no_key__``. Those attributes remain
the compatibility surface, but registry roots can now declare one nominal
family object and let the metaclass install the legacy attributes.
"""
key_attribute: str | RegistryKeyAttribute
skip_if_no_key: bool = True
registry_name: Optional[str] = None
@property
def key_attribute_name(self) -> str:
"""Return the string attribute name consumed by ``AutoRegisterMeta``."""
if isinstance(self.key_attribute, RegistryKeyAttribute):
return self.key_attribute.value
return self.key_attribute
[docs]
def apply_to_class(self, cls: Type, explicit_attrs: dict) -> None:
"""Install legacy metaclass attributes for compatibility/introspection."""
if '__registry_key__' not in explicit_attrs:
cls.__registry_key__ = self.key_attribute_name
if '__skip_if_no_key__' not in explicit_attrs:
cls.__skip_if_no_key__ = self.skip_if_no_key
if self.registry_name is not None and '__registry_name__' not in explicit_attrs:
cls.__registry_name__ = self.registry_name
[docs]
@dataclass(frozen=True)
class RegistryConfig:
"""
Configuration for automatic class registration behavior.
This dataclass encapsulates all the configuration needed for metaclass
registration, making the pattern explicit and easy to understand.
Attributes:
registry_dict: Dictionary to register classes into (e.g., MICROSCOPE_HANDLERS)
key_attribute: Name of class attribute containing the registration key
(e.g., '_microscope_type', '_backend_type', '_context_type')
key_extractor: Optional function to derive key from class name if key_attribute
is not set. Signature: (class_name: str, cls: Type) -> str
skip_if_no_key: If True, skip registration when key_attribute is None.
If False, require either key_attribute or key_extractor.
secondary_registries: Optional list of secondary registry configurations
log_registration: If True, log debug message when class is registered
registry_name: Human-readable name for logging (e.g., 'microscope handler')
discovery_package: Optional package name to auto-discover (e.g., 'openhcs.microscopes')
discovery_recursive: If True, use recursive discovery (default: False)
Examples:
# Microscope handlers with name-based key extraction and secondary registry
RegistryConfig(
registry_dict=MICROSCOPE_HANDLERS,
key_attribute='_microscope_type',
key_extractor=extract_key_from_handler_suffix,
skip_if_no_key=False,
secondary_registries=[
SecondaryRegistry(
registry_dict=METADATA_HANDLERS,
key_source=PRIMARY_KEY,
attr_name='_metadata_handler_class'
)
],
log_registration=True,
registry_name='microscope handler'
)
# Storage backends with explicit key and skip-if-none behavior
RegistryConfig(
registry_dict=STORAGE_BACKENDS,
key_attribute='_backend_type',
skip_if_no_key=True,
registry_name='storage backend'
)
# Context providers with simple explicit key
RegistryConfig(
registry_dict=CONTEXT_PROVIDERS,
key_attribute='_context_type',
skip_if_no_key=True,
registry_name='context provider'
)
"""
registry_dict: RegistryDict
key_attribute: str
key_extractor: Optional[KeyExtractor] = None
skip_if_no_key: bool = False
secondary_registries: Optional[list[SecondaryRegistry]] = None
log_registration: bool = True
registry_name: str = "plugin"
discovery_package: Optional[str] = None # Auto-inferred from base class module if None
discovery_recursive: bool = False
discovery_function: Optional[Callable] = None # Custom discovery function
[docs]
class AutoRegisterMeta(ABCMeta):
"""
Generic metaclass for automatic plugin registration (Pattern A).
This metaclass automatically registers concrete classes in a global registry
when they are defined, eliminating the need for manual registration calls.
Features:
- Skips abstract classes (checks __abstractmethods__)
- Supports explicit keys via class attributes
- Supports derived keys via key extraction functions
- Supports secondary registries (e.g., metadata handlers)
- Configurable skip-if-no-key behavior
- Debug logging for registration events
Usage:
# Create domain-specific metaclass
class MicroscopeHandlerMeta(AutoRegisterMeta):
def __new__(mcs, name, bases, attrs):
return super().__new__(mcs, name, bases, attrs,
registry_config=_MICROSCOPE_REGISTRY_CONFIG)
# Use in class definition
class ImageXpressHandler(MicroscopeHandler, metaclass=MicroscopeHandlerMeta):
_microscope_type = 'imagexpress' # Optional if key_extractor is provided
_metadata_handler_class = ImageXpressMetadata # Optional secondary registration
Design Principles:
- Explicit configuration over magic behavior
- Preserve all domain-specific features
- Zero breaking changes to existing code
- Easy to understand and debug
"""
[docs]
def __new__(mcs, name: str, bases: tuple, attrs: dict,
registry_config: Optional[RegistryConfig] = None):
"""
Create a new class and register it if appropriate.
Args:
name: Name of the class being created
bases: Base classes
attrs: Class attributes dictionary
registry_config: Configuration for registration behavior.
If None, auto-configures from class attributes or skips registration.
Returns:
The newly created class
"""
registry_family = attrs.get('__registry_family__')
# Create the class using ABCMeta
new_class = super().__new__(mcs, name, bases, attrs)
if isinstance(registry_family, RegistryFamily):
registry_family.apply_to_class(new_class, attrs)
# Auto-configure registry if not provided but class has __registry__ attributes
if registry_config is None:
registry_config = mcs._auto_configure_registry(new_class, attrs)
if registry_config is None:
return new_class # No config and no auto-config possible
# Set up lazy discovery if registry dict supports it (only once for base class)
if isinstance(registry_config.registry_dict, LazyDiscoveryDict) and not registry_config.registry_dict._config:
from dataclasses import replace
# Auto-infer discovery_package from base class module if not specified
config = registry_config
if config.discovery_package is None:
# Extract package from base class module (e.g., 'openhcs.microscopes.microscope_base' → 'openhcs.microscopes')
module_parts = new_class.__module__.rsplit('.', 1)
inferred_package = module_parts[0] if len(module_parts) > 1 else new_class.__module__
# Create new config with inferred package
config = replace(config, discovery_package=inferred_package)
logger.debug(f"Auto-inferred discovery_package='{inferred_package}' from {new_class.__module__}")
# Auto-infer discovery_recursive based on package structure
# Check if package has subdirectories with __init__.py (indicating nested structure)
if config.discovery_package:
try:
pkg = importlib.import_module(config.discovery_package)
if hasattr(pkg, '__path__'):
import os
has_subpackages = False
for path in pkg.__path__:
if os.path.isdir(path):
# Check if any subdirectories contain __init__.py
for entry in os.listdir(path):
subdir = os.path.join(path, entry)
if os.path.isdir(subdir) and os.path.exists(os.path.join(subdir, '__init__.py')):
has_subpackages = True
break
if has_subpackages:
break
# Only override if discovery_recursive is still at default (False)
# This allows explicit overrides to take precedence
if has_subpackages and not config.discovery_recursive:
config = replace(config, discovery_recursive=True)
logger.debug(f"Auto-inferred discovery_recursive=True for '{config.discovery_package}' (has subpackages)")
elif not has_subpackages and config.discovery_recursive:
logger.debug(f"Keeping explicit discovery_recursive=True for '{config.discovery_package}' (no subpackages detected)")
except Exception as e:
logger.debug(f"Failed to auto-infer discovery_recursive: {e}")
# Auto-wrap secondary registries with SecondaryRegistryDict
if config.secondary_registries:
import sys
wrapped_secondaries = []
module = sys.modules.get(new_class.__module__)
for sec_reg in config.secondary_registries:
# Check if secondary registry needs wrapping
if isinstance(sec_reg.registry_dict, dict) and not isinstance(sec_reg.registry_dict, SecondaryRegistryDict):
# Create a new SecondaryRegistryDict wrapping the primary registry
wrapped_dict = SecondaryRegistryDict(registry_config.registry_dict)
# Copy any existing entries from the old dict
wrapped_dict.update(sec_reg.registry_dict)
# Find and update the module global variable
if module:
for var_name, var_value in vars(module).items():
if var_value is sec_reg.registry_dict:
setattr(module, var_name, wrapped_dict)
logger.debug(f"Auto-wrapped secondary registry '{var_name}' in {new_class.__module__}")
break
# Create new SecondaryRegistry with wrapped dict
wrapped_sec_reg = SecondaryRegistry(
registry_dict=wrapped_dict,
key_source=sec_reg.key_source,
attr_name=sec_reg.attr_name
)
wrapped_secondaries.append(wrapped_sec_reg)
else:
wrapped_secondaries.append(sec_reg)
# Rebuild config with wrapped secondary registries
config = replace(config, secondary_registries=wrapped_secondaries)
registry_config.registry_dict._set_config(new_class, config)
# Only register concrete classes (not abstract base classes)
if not bases or getattr(new_class, '__abstractmethods__', None):
return new_class
# Get or derive the registration key
key = mcs._get_registration_key(name, new_class, registry_config)
# Handle missing key
if key is None:
return mcs._handle_missing_key(name, registry_config, new_class)
# Register in primary registry
mcs._register_class(new_class, key, registry_config)
# Handle secondary registrations
if registry_config.secondary_registries:
mcs._register_secondary(new_class, key, registry_config.secondary_registries)
# Log registration if enabled
if registry_config.log_registration:
logger.debug(f"Auto-registered {name} as '{key}' {registry_config.registry_name}")
return new_class
@staticmethod
def _get_registration_key(name: str, cls: Type, config: RegistryConfig) -> Optional[str]:
"""Get the registration key for a class (explicit or derived)."""
# Try explicit key first
key = getattr(cls, config.key_attribute, None)
if key is not None:
return key
# Try key extractor if provided
if config.key_extractor is not None:
return config.key_extractor(name, cls)
return None
@staticmethod
def _handle_missing_key(name: str, config: RegistryConfig, new_class: Type) -> Type:
"""Handle case where no registration key is available."""
if config.skip_if_no_key:
if config.log_registration:
logger.debug(f"Skipping registration for {name} - no {config.key_attribute}")
return new_class # Return the class, just don't register it
else:
raise ValueError(
f"Class {name} must have {config.key_attribute} attribute "
f"or provide a key_extractor in registry config"
)
@classmethod
def _auto_configure_registry(mcs, new_class: Type, attrs: dict) -> Optional[RegistryConfig]:
"""
Auto-configure registry from metaclass OR base class attributes.
Priority:
1. Metaclass attributes (__registry_dict__, __registry_key__ on metaclass)
2. Base class attributes (__registry_key__ on class, auto-create __registry__)
3. Parent class __registry__ (inherit from parent)
Returns:
RegistryConfig if auto-configuration successful, None otherwise
"""
# Check if the metaclass has __registry_dict__ attribute (old style)
registry_dict = getattr(mcs, '__registry_dict__', None)
# If no metaclass registry, check if base class wants auto-creation or inheritance
if registry_dict is None:
# First check if any parent class has __registry__ (inherit from parent)
# This takes priority over creating a new registry
for base in new_class.__mro__[1:]: # Skip self
if hasattr(base, '__registry__'):
registry_dict = base.__registry__
key_attribute = getattr(base, '__registry_key__', None)
key_extractor = getattr(base, '__key_extractor__', None)
skip_if_no_key = getattr(base, '__skip_if_no_key__', True)
secondary_registries = getattr(base, '__secondary_registries__', None)
registry_name = getattr(base, '__registry_name__', None)
break
else:
# No parent registry found - check if class explicitly defines a
# registry family or __registry_key__ (only create new registry
# from the class body, or from an inherited registry protocol
# mixin that does not own a registry itself).
registry_family = attrs.get('__registry_family__')
key_attribute = attrs.get('__registry_key__')
if key_attribute is None and isinstance(registry_family, RegistryFamily):
key_attribute = registry_family.key_attribute_name
inherited_registry_base = None
if key_attribute is None:
for base in new_class.__mro__[1:]:
inherited_key_attribute = getattr(base, '__registry_key__', None)
if inherited_key_attribute is not None:
inherited_registry_base = base
key_attribute = inherited_key_attribute
break
if key_attribute is not None:
# Check if class already provides its own __registry__ dict
# (allows opting out of LazyDiscoveryDict)
if '__registry__' in attrs:
registry_dict = attrs['__registry__']
else:
# Auto-create registry dict and store on the class
registry_dict = LazyDiscoveryDict()
new_class.__registry__ = registry_dict
# Get other optional attributes from class. Explicit legacy
# class attributes override the family/inherited declaration.
key_extractor = attrs.get('__key_extractor__')
if key_extractor is None and inherited_registry_base is not None:
key_extractor = getattr(
inherited_registry_base, '__key_extractor__', None
)
if '__skip_if_no_key__' in attrs:
skip_if_no_key = attrs['__skip_if_no_key__']
elif isinstance(registry_family, RegistryFamily):
skip_if_no_key = registry_family.skip_if_no_key
elif inherited_registry_base is not None:
skip_if_no_key = getattr(
inherited_registry_base, '__skip_if_no_key__', True
)
else:
skip_if_no_key = True
secondary_registries = attrs.get('__secondary_registries__')
if (
secondary_registries is None
and inherited_registry_base is not None
):
secondary_registries = getattr(
inherited_registry_base, '__secondary_registries__', None
)
if '__registry_name__' in attrs:
registry_name = attrs['__registry_name__']
elif isinstance(registry_family, RegistryFamily):
registry_name = registry_family.registry_name
elif inherited_registry_base is not None:
registry_name = getattr(
inherited_registry_base, '__registry_name__', None
)
else:
registry_name = None
else:
return None # No registry configuration found
else:
# Old style: get from metaclass
key_attribute = getattr(mcs, '__registry_key__', '_registry_key')
key_extractor = getattr(mcs, '__key_extractor__', None)
skip_if_no_key = getattr(mcs, '__skip_if_no_key__', True)
secondary_registries = getattr(mcs, '__secondary_registries__', None)
registry_name = getattr(mcs, '__registry_name__', None)
# Auto-derive registry name if not provided
if registry_name is None:
# Derive from class name: "StorageBackend" → "storage backend"
clean_name = new_class.__name__
for suffix in ['Base', 'Meta', 'Handler', 'Registry']:
if clean_name.endswith(suffix):
clean_name = clean_name[:-len(suffix)]
break
# Convert CamelCase to space-separated lowercase
import re
registry_name = re.sub(r'([A-Z])', r' \1', clean_name).strip().lower()
logger.debug(f"Auto-configured registry for {new_class.__name__}: "
f"key_attribute={key_attribute}, registry_name={registry_name}")
return RegistryConfig(
registry_dict=registry_dict,
key_attribute=key_attribute,
key_extractor=key_extractor,
skip_if_no_key=skip_if_no_key,
secondary_registries=secondary_registries,
registry_name=registry_name
)
@staticmethod
def _register_class(cls: Type, key: str, config: RegistryConfig) -> None:
"""Register class in primary registry."""
config.registry_dict[key] = cls
setattr(cls, config.key_attribute, key)
@staticmethod
def _register_secondary(
cls: Type,
primary_key: str,
secondary_registries: list[SecondaryRegistry]
) -> None:
"""Handle secondary registry registrations."""
for sec_reg in secondary_registries:
value = getattr(cls, sec_reg.attr_name, None)
if value is None:
continue
# Determine the key for secondary registration
if sec_reg.key_source == PRIMARY_KEY:
secondary_key = primary_key
else:
secondary_key = getattr(cls, sec_reg.key_source, None)
if secondary_key is None:
logger.warning(
f"Cannot register {sec_reg.attr_name} for {cls.__name__} - "
f"no {sec_reg.key_source} attribute"
)
continue
# Register in secondary registry
sec_reg.registry_dict[secondary_key] = value
logger.debug(f"Auto-registered {sec_reg.attr_name} from {cls.__name__} as '{secondary_key}'")
[docs]
class RegisteredEnumMeta(AutoRegisterMeta, EnumMeta):
"""Metaclass for enum families that also need AutoRegisterMeta membership."""
# Helper functions for common key extraction patterns
[docs]
def extract_key_from_class_name(name: str, cls: Type) -> str:
"""Use the concrete class name as its registry key."""
return name
[docs]
def make_suffix_extractor(suffix: str) -> KeyExtractor:
"""
Create a key extractor that removes a suffix from class names.
Args:
suffix: The suffix to remove (e.g., 'Handler', 'Backend')
Returns:
A key extractor function
Examples:
extract_handler = make_suffix_extractor('Handler')
extract_handler('ImageXpressHandler', cls) -> 'imagexpress'
extract_backend = make_suffix_extractor('Backend')
extract_backend('DiskStorageBackend', cls) -> 'diskstorage'
"""
suffix_len = len(suffix)
def extractor(name: str, cls: Type) -> str:
if name.endswith(suffix):
return name[:-suffix_len].lower()
return name.lower()
return extractor
# Pre-built extractors for common patterns
extract_key_from_handler_suffix = make_suffix_extractor('Handler')
extract_key_from_backend_suffix = make_suffix_extractor('Backend')