Documentation
ΒΆ
Overview ΒΆ
Package smarterbase provides database-like functionality using Redis (for indexes) + S3/GCS (for storage) instead of traditional databases, offering 85% cost savings while maintaining high durability and availability.
Overview ΒΆ
SmarterBase turns existing infrastructure (Redis for caching, S3 for files) into a queryable, transactional document store with zero database operations overhead. It provides:
- Secondary indexes with O(1) lookups via Redis
- Query interface for filtering, sorting, and pagination
- Optimistic transactions with rollback
- Distributed locking for race-free updates
- Batch operations for parallel reads/writes
- Schema versioning and migrations
- Boilerplate reduction helpers (85% less code for common patterns)
- Full observability (Prometheus metrics + structured logging)
Quick Start ΒΆ
Basic usage with filesystem backend (development):
backend := smarterbase.NewFilesystemBackend("./data")
store := smarterbase.NewStore(backend)
ctx := context.Background()
// Create
user := &User{ID: smarterbase.NewID(), Email: "alice@example.com"}
store.PutJSON(ctx, "users/"+user.ID, user)
// Read
var retrieved User
store.GetJSON(ctx, "users/"+user.ID, &retrieved)
Production setup with S3, Redis locking, and encryption:
// Initialize S3 and Redis s3Client := s3.NewFromConfig(cfg) redisClient := redis.NewClient(smarterbase.RedisOptions()) // Create production-safe backend with distributed locking backend := smarterbase.NewS3BackendWithRedisLock(s3Client, "my-bucket", redisClient) // Add encryption encKey := loadFromSecretsManager() // 32-byte key encBackend, _ := smarterbase.NewEncryptionBackend(backend, encKey) // Add observability logger, _ := smarterbase.NewProductionZapLogger() metrics := smarterbase.NewPrometheusMetrics(prometheus.DefaultRegisterer) store := smarterbase.NewStoreWithObservability(encBackend, logger, metrics)
Core Concepts ΒΆ
Backend: Storage abstraction layer supporting S3, GCS, filesystem, and MinIO. All data operations go through the Backend interface for portability.
Store: High-level API for JSON operations, queries, transactions, and batch operations. The Store wraps a Backend and provides convenience methods.
IndexManager: Coordinates automatic index updates across file-based and Redis indexes when creating, updating, or deleting entities.
Redis Indexing: Fast O(1) lookups for secondary indexes (e.g., email -> user_id, user_id -> [order_ids]). Enables queries without scanning all objects.
Distributed Locking: Redis-based locks prevent race conditions in multi-writer scenarios. Critical for S3 backends where native locking is unavailable.
Schema Versioning: Optional migration system for evolving JSON schemas over time without downtime. Migrations run automatically on read when version mismatches are detected.
Indexing and Queries ΒΆ
Register indexes for fast lookups:
redisIndexer := smarterbase.NewRedisIndexer(redisClient)
// Multi-value index (1:N): user_id -> [order1, order2, ...]
redisIndexer.RegisterMultiValueIndex("orders", "user_id", func(data []byte) (string, string, error) {
var order Order
json.Unmarshal(data, &order)
return order.ID, order.UserID, nil
})
// Create IndexManager for automatic updates
indexManager := smarterbase.NewIndexManager(store).WithRedisIndexer(redisIndexer)
// Create with automatic indexing
order := &Order{ID: smarterbase.NewID(), UserID: "user-123"}
indexManager.Create(ctx, "orders/"+order.ID, order)
// Query by index - O(1) lookup
orderIDs, _ := redisIndexer.QueryMultiValueIndex(ctx, "orders", "user_id", "user-123")
Query builder for filtering and sorting:
var users []*User
store.Query("users/").
FilterJSON(func(obj map[string]interface{}) bool {
return obj["active"].(bool)
}).
SortByField("created_at", false).
Limit(50).
All(ctx, &users)
Schema Versioning and Migrations ΒΆ
Evolve schemas without downtime:
// Version 0 (original schema)
type User struct {
ID string `json:"id"`
Name string `json:"name"`
}
// Version 1 (evolved schema)
type User struct {
V int `json:"_v"` // Add version field
ID string `json:"id"`
FirstName string `json:"first_name"` // Split from name
LastName string `json:"last_name"` // Split from name
}
// Register migration at app startup
func init() {
smarterbase.Migrate("User").From(0).To(1).
Split("name", " ", "first_name", "last_name")
}
// Old data migrates automatically on read
var user User
user.V = 1 // Set expected version
store.GetJSON(ctx, "users/123", &user) // Migration happens here
See ADR-0001 (docs/adr/0001-schema-versioning-and-migrations.md) for design rationale and migration patterns.
Atomic Updates and Distributed Locking ΒΆ
For critical operations requiring true isolation (financial transactions, inventory updates):
lock := smarterbase.NewDistributedLock(redisClient, "smarterbase")
err := smarterbase.WithAtomicUpdate(ctx, store, lock, "accounts/123", 10*time.Second,
func(ctx context.Context) error {
var account Account
store.GetJSON(ctx, "accounts/123", &account)
account.Balance += 100
store.PutJSON(ctx, "accounts/123", &account)
return nil
})
For non-critical updates where eventual consistency is acceptable:
err := store.WithTransaction(ctx, func(tx *smarterbase.OptimisticTransaction) error {
var user User
tx.Get(ctx, "users/123", &user)
user.LastSeen = time.Now()
tx.Put("users/123", user)
return nil
})
Note: WithTransaction provides optimistic locking (NOT ACID). Use WithAtomicUpdate with distributed locks for operations requiring true isolation.
Batch Operations ΒΆ
Efficient parallel operations:
// Type-safe batch read (recommended)
keys := []string{"users/1.json", "users/2.json", "users/3.json"}
users, err := smarterbase.BatchGet[User](ctx, store, keys)
// Batch write
items := map[string]interface{}{
"users/1": &User{ID: "1", Email: "user1@example.com"},
"users/2": &User{ID: "2", Email: "user2@example.com"},
}
results := store.BatchPutJSON(ctx, items)
Helper Functions ΒΆ
The package provides several helper functions for common patterns:
BatchGet[T] - Type-safe batch reads with automatic unmarshaling
BatchGetWithErrors[T] - Like BatchGet but returns errors per-key
GetByIndex[T] - Fetch a single entity by index value
QueryIndexTyped[T] - Query an index and return typed results
RedisOptions() - Production-ready Redis configuration from environment variables ΒΆ
See ADR-0005 (docs/adr/0005-core-api-helpers-guidance.md) for usage guidance.
Boilerplate Reduction Helpers (ADR-0006) ΒΆ
Three focused helpers eliminate 85-90% of repetitive patterns:
QueryWithFallback[T] - Redis β scan fallback with automatic profiling (50β6 lines):
admins, err := smarterbase.QueryWithFallback[User](
ctx, store, redisIndexer,
"users", "role", "admin", // Redis index lookup
"users/", // Fallback scan prefix
func(u *User) bool { return u.Role == "admin" }, // Fallback filter
)
UpdateWithIndexes - Atomic update with coordinated index updates:
err := smarterbase.UpdateWithIndexes(
ctx, store, redisIndexer,
"users/user-123.json", user,
[]smarterbase.IndexUpdate{
{EntityType: "users", IndexField: "email", OldValue: old, NewValue: new},
},
)
BatchGetWithFilter[T] - Load and filter in one call:
active, err := smarterbase.BatchGetWithFilter[User](
ctx, store, keys,
func(u *User) bool { return u.Active },
)
See ADR-0006 (docs/adr/0006-collection-api.md) and examples/production-patterns/ for details.
Critical Gotchas ΒΆ
1. S3 Race Conditions: Always use S3BackendWithRedisLock for production multi-writer scenarios. Plain S3Backend has a race window in PutIfMatch operations.
2. Transactions Are NOT ACID: WithTransaction() does NOT provide isolation. Another process can modify data during the transaction. Use WithAtomicUpdate() + distributed locks for critical operations.
3. Memory Usage: Query.All() loads everything into memory. Use Each() or pagination for large datasets.
4. Index Drift: Enable IndexHealthMonitor to auto-detect and repair stale Redis indexes.
5. S3 Latency: Base latency is 50-100ms. Add caching for hot data if sub-millisecond response times are required.
Storage Backends ΒΆ
Filesystem (development):
backend := smarterbase.NewFilesystemBackend("./storage")
S3 (production - recommended):
backend := smarterbase.NewS3BackendWithRedisLock(s3Client, "my-bucket", redisClient)
Google Cloud Storage:
backend := smarterbase.NewGCSBackend(ctx, smarterbase.GCSConfig{
ProjectID: "my-project",
Bucket: "my-bucket",
})
MinIO / S3-compatible:
backend := smarterbase.NewMinIOBackend(smarterbase.MinIOConfig{
Endpoint: "localhost:9000",
Bucket: "my-bucket",
AccessKey: "minioadmin",
SecretKey: "minioadmin",
})
Encryption wrapper (any backend):
encBackend, _ := smarterbase.NewEncryptionBackend(backend, encryptionKey)
When to Use SmarterBase ΒΆ
Perfect for:
- User management (profiles, preferences, settings)
- Configuration storage (app configs, feature flags)
- Content management (blog posts, articles, pages)
- Order/invoice storage (e-commerce transactions)
- Metadata catalogs (file metadata, asset tracking)
- Event logs (audit trails, activity logs)
- API caching (long-lived cached responses)
Not suitable for:
- Complex JOINs across multiple entity types
- Real-time aggregations (SUM, COUNT, GROUP BY)
- Strict ACID transactions
- Sub-millisecond response times at scale
- Full-text search (use Elasticsearch)
- Graph queries (use Neo4j)
- Time-series analytics (use TimescaleDB)
Observability ΒΆ
Metrics (Prometheus):
metrics := smarterbase.NewPrometheusMetrics(prometheus.DefaultRegisterer) metrics.RegisterAll() store := smarterbase.NewStoreWithObservability(backend, logger, metrics)
Logging (Zap structured logging):
logger, _ := smarterbase.NewProductionZapLogger() store := smarterbase.NewStoreWithObservability(backend, logger, metrics)
Index health monitoring with auto-repair:
monitor := smarterbase.NewIndexHealthMonitor(store, redisIndexer) monitor.Start(ctx) // Checks every 5 minutes, auto-repairs drift >5% defer monitor.Stop()
Documentation and Examples ΒΆ
Package documentation:
- README.md - Comprehensive guide with examples
- DATASHEET.md - Technical specifications and API reference
- CHANGELOG.md - Version history and release notes
Architecture Decision Records (ADRs):
- docs/adr/0001-schema-versioning-and-migrations.md - Migration system design
- docs/adr/0002-redis-configuration-ergonomics.md - Redis config patterns
- docs/adr/0003-simple-api-layer.md - Simple API design (optional high-level API)
- docs/adr/0004-simple-api-versioning.md - Simple API versioning strategy
- docs/adr/0005-core-api-helpers-guidance.md - When to use helper functions
- docs/adr/0006-collection-api.md - Boilerplate reduction helpers (QueryWithFallback, etc.)
Working examples:
- examples/simple/ - Progressive tutorials (quickstart to versioning)
- examples/schema-migrations/ - Schema evolution patterns
- examples/user-management/ - CRUD with Redis indexing
- examples/ecommerce-orders/ - Order management with atomic updates
- examples/production-patterns/ - Complete production setup
- examples/multi-tenant-config/ - Multi-tenant scenarios
- examples/metrics-dashboard/ - Observability integration
- examples/event-logging/ - JSONL append-only logs
AI Assistant Context:
- .ai-context - Quick reference for LLMs working with this codebase
Performance Characteristics ΒΆ
Latency (typical):
- Filesystem Get: 1-3ms
- S3 Get: 50-80ms
- Put with indexes: +5-10ms (Redis updates)
- Distributed lock: +2-5ms (no contention)
Throughput:
- Filesystem: 10,000+ ops/sec (with striped locks)
- S3: Up to 3,500 PUT/sec per prefix (AWS limit)
Scalability:
- Tested with millions of objects
- Redis can handle billions of index entries
- S3 scales infinitely
Repository and License ΒΆ
Repository: https://github.com/adrianmcphee/smarterbase
License: MIT License - See LICENSE file for details
Issues and feature requests: https://github.com/adrianmcphee/smarterbase/issues
Security: See SECURITY.md for reporting vulnerabilities
Contributing: See CONTRIBUTING.md for development guidelines
Index ΒΆ
- Constants
- Variables
- func AutoRegisterIndexes(redisIndexer *RedisIndexer, entityType string, example interface{}) error
- func BatchGet[T any](ctx context.Context, store *Store, keys []string) ([]*T, error)
- func BatchGetWithErrors[T any](ctx context.Context, store *Store, keys []string) ([]*T, map[string]error)
- func BatchGetWithFilter[T any](ctx context.Context, store *Store, keys []string, filter func(*T) bool) ([]*T, error)
- func DetectCircularCascade(cascades map[string][]CascadeSpec) error
- func ExtractIDFromCascadeKey(key string) string
- func ExtractJSONField(fieldName string) func(objectKey string, data []byte) ([]IndexEntry, error)
- func ExtractJSONFieldForConstraint(fieldName string) func(data interface{}) (string, error)
- func ExtractNestedJSONField(fieldPath ...string) func(objectKey string, data []byte) ([]IndexEntry, error)
- func GetByIndex[T any](ctx context.Context, im *IndexManager, entityType, field, value string) (*T, error)
- func GetJSON(backend Backend, ctx context.Context, key string, dest interface{}) error
- func IsConflict(err error) bool
- func IsConstraintViolation(err error) bool
- func IsNotFound(err error) bool
- func IsPermanent(err error) bool
- func IsRetryable(err error) bool
- func IsValidID(s string) bool
- func NewID() string
- func NormalizeEmail(email string) string
- func NormalizeString(s string) string
- func Now() time.Time
- func ParseID(s string) (uuid.UUID, error)
- func PutJSON(backend Backend, ctx context.Context, key string, value interface{}) error
- func QueryIndexTyped[T any](ctx context.Context, im *IndexManager, entityType, field, value string) ([]*T, error)
- func QueryWithFallback[T any](ctx context.Context, store *Store, redisIndexer *RedisIndexer, ...) ([]*T, error)
- func RedisOptions() *redis.Options
- func RedisOptionsWithOverrides(addr, password string, poolSize, minIdleConns int) *redis.Options
- func RegisterIndexesForType(cfg IndexConfig, example interface{}, manualIndexes func()) error
- func UnmarshalBatchResults[T any](results map[string]interface{}) ([]*T, error)
- func UpdateWithIndexes(ctx context.Context, store *Store, redisIndexer *RedisIndexer, key string, ...) error
- func ValidateCascadeSpec(spec CascadeSpec) error
- func WithAtomicUpdate(ctx context.Context, store *Store, lock *DistributedLock, key string, ...) error
- func WithContext(err error, context map[string]interface{}) error
- func WithProfiler(ctx context.Context, profiler *QueryProfiler) context.Context
- type AuditOptions
- type AuditReport
- type Backend
- type BackendConfig
- type BatchOperation
- type BatchOperationResult
- type BatchWriter
- type CascadeDeleteFunc
- type CascadeIndexManager
- func (cim *CascadeIndexManager) DeleteWithCascade(ctx context.Context, parentEntityType string, key string, parentID string) error
- func (cim *CascadeIndexManager) RegisterCascade(parentEntityType string, spec CascadeSpec)
- func (cim *CascadeIndexManager) RegisterCascadeChain(parentEntityType string, specs []CascadeSpec)
- type CascadeManager
- func (cm *CascadeManager) ExecuteCascadeDelete(ctx context.Context, parentEntityType string, parentID string, ...) error
- func (cm *CascadeManager) GetCascadeTree() map[string][]string
- func (cm *CascadeManager) PrintCascadeTree() string
- func (cm *CascadeManager) Register(parentEntityType string, spec CascadeSpec)
- func (cm *CascadeManager) RegisterChain(parentEntityType string, specs []CascadeSpec)
- type CascadeSpec
- type CircuitBreaker
- type ConstraintManager
- func (cm *ConstraintManager) ClaimUniqueKeys(ctx context.Context, entityType, objectKey string, data interface{}) ([]string, error)
- func (cm *ConstraintManager) RebuildConstraints(ctx context.Context, entityType string, objects map[string]interface{}) error
- func (cm *ConstraintManager) RegisterConstraint(constraint *UniqueConstraint)
- func (cm *ConstraintManager) ReleaseUniqueKeys(ctx context.Context, claimedKeys []string) error
- func (cm *ConstraintManager) UpdateUniqueKeys(ctx context.Context, entityType, objectKey string, ...) ([]string, error)
- func (cm *ConstraintManager) VerifyConstraint(ctx context.Context, entityType, fieldName, value, expectedKey string) (bool, error)
- type ConstraintViolationError
- type Counter
- type CounterAudit
- func (ca *CounterAudit) Audit(ctx context.Context, opts *AuditOptions) (*AuditReport, error)
- func (ca *CounterAudit) GetCounterInfo(ctx context.Context, key string) (*CounterInfo, error)
- func (ca *CounterAudit) ListCounters(ctx context.Context, pattern string) ([]string, error)
- func (ca *CounterAudit) RepairCounter(ctx context.Context, key string, suggestedValue int64) error
- type CounterInfo
- type DistributedLock
- type EncryptionBackend
- func (e *EncryptionBackend) Append(ctx context.Context, key string, data []byte) error
- func (e *EncryptionBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (e *EncryptionBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
- func (e *EncryptionBackend) GetWithETag(ctx context.Context, key string) ([]byte, string, error)
- func (e *EncryptionBackend) Put(ctx context.Context, key string, data []byte) error
- func (e *EncryptionBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
- func (e *EncryptionBackend) PutStream(ctx context.Context, key string, reader io.Reader, size int64) error
- type ErrorWithContext
- type FilesystemBackend
- func (b *FilesystemBackend) Append(ctx context.Context, key string, data []byte) error
- func (b *FilesystemBackend) Close() error
- func (b *FilesystemBackend) Delete(ctx context.Context, key string) error
- func (b *FilesystemBackend) Exists(ctx context.Context, key string) (bool, error)
- func (b *FilesystemBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (b *FilesystemBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
- func (b *FilesystemBackend) GetWithETag(ctx context.Context, key string) ([]byte, string, error)
- func (b *FilesystemBackend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *FilesystemBackend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
- func (b *FilesystemBackend) Ping(ctx context.Context) error
- func (b *FilesystemBackend) Put(ctx context.Context, key string, data []byte) error
- func (b *FilesystemBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
- func (b *FilesystemBackend) PutStream(ctx context.Context, key string, reader io.Reader, size int64) error
- type FilesystemBackendWithRedisLock
- type GCSBackend
- func (b *GCSBackend) Append(ctx context.Context, key string, data []byte) error
- func (b *GCSBackend) Close() error
- func (b *GCSBackend) Delete(ctx context.Context, key string) error
- func (b *GCSBackend) Exists(ctx context.Context, key string) (bool, error)
- func (b *GCSBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (b *GCSBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
- func (b *GCSBackend) GetWithETag(ctx context.Context, key string) ([]byte, string, error)
- func (b *GCSBackend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *GCSBackend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
- func (b *GCSBackend) Ping(ctx context.Context) error
- func (b *GCSBackend) Put(ctx context.Context, key string, data []byte) error
- func (b *GCSBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
- func (b *GCSBackend) PutStream(ctx context.Context, key string, reader io.Reader, size int64) error
- type GCSConfig
- type InMemoryMetrics
- func (m *InMemoryMetrics) Gauge(name string, value float64, tags ...string)
- func (m *InMemoryMetrics) Histogram(name string, value float64, tags ...string)
- func (m *InMemoryMetrics) Increment(name string, tags ...string)
- func (m *InMemoryMetrics) Timing(name string, duration time.Duration, tags ...string)
- type IndexConfig
- type IndexEntry
- type IndexHealthMonitor
- func (ihm *IndexHealthMonitor) Check(ctx context.Context, entityType string) (*IndexHealthReport, error)
- func (ihm *IndexHealthMonitor) RepairDrift(ctx context.Context, report *IndexHealthReport) error
- func (ihm *IndexHealthMonitor) Start(ctx context.Context) error
- func (ihm *IndexHealthMonitor) Stop()
- func (ihm *IndexHealthMonitor) WithAutoRepair(enabled bool) *IndexHealthMonitor
- func (ihm *IndexHealthMonitor) WithDriftThreshold(threshold float64) *IndexHealthMonitor
- func (ihm *IndexHealthMonitor) WithInterval(interval time.Duration) *IndexHealthMonitor
- func (ihm *IndexHealthMonitor) WithSampleSize(size int) *IndexHealthMonitor
- type IndexHealthReport
- type IndexManager
- func (im *IndexManager) Create(ctx context.Context, key string, data interface{}) error
- func (im *IndexManager) Delete(ctx context.Context, key string) error
- func (im *IndexManager) Exists(ctx context.Context, key string) (bool, error)
- func (im *IndexManager) Get(ctx context.Context, key string, dest interface{}) error
- func (im *IndexManager) Update(ctx context.Context, key string, newData interface{}) error
- func (im *IndexManager) WithConstraintManager(manager *ConstraintManager) *IndexManager
- func (im *IndexManager) WithRedisIndexer(indexer *RedisIndexer) *IndexManager
- type IndexRepairService
- type IndexTag
- type IndexUpdate
- type KeyBuilder
- type LockInfo
- type LockManager
- func (lm *LockManager) CleanupOrphanedLocks(ctx context.Context, minAge time.Duration) (int, error)
- func (lm *LockManager) ForceRelease(ctx context.Context, resourceKey string) error
- func (lm *LockManager) GetLockInfo(ctx context.Context, resourceKey string) (*LockInfo, error)
- func (lm *LockManager) ListLocks(ctx context.Context) ([]LockInfo, error)
- type Logger
- type MethodStats
- type Metrics
- type MetricsExporter
- type MetricsRecorder
- type MigrationBuilder
- func (b *MigrationBuilder) AddField(field string, defaultValue interface{}) *MigrationBuilder
- func (b *MigrationBuilder) Do(fn MigrationFunc) *MigrationBuilder
- func (b *MigrationBuilder) From(version int) *MigrationBuilder
- func (b *MigrationBuilder) RemoveField(field string) *MigrationBuilder
- func (b *MigrationBuilder) RenameField(oldName, newName string) *MigrationBuilder
- func (b *MigrationBuilder) Split(sourceField, delimiter string, targetFields ...string) *MigrationBuilder
- func (b *MigrationBuilder) To(version int) *MigrationBuilder
- type MigrationFunc
- type MigrationPolicy
- type MigrationRegistry
- type MinIOConfig
- type MultiIndexSpec
- type NoOpLogger
- type NoOpMetrics
- type OptimisticTransaction
- func (tx *OptimisticTransaction) Commit(ctx context.Context) error
- func (tx *OptimisticTransaction) Delete(key string)
- func (tx *OptimisticTransaction) Get(ctx context.Context, key string, dest interface{}) error
- func (tx *OptimisticTransaction) Put(key string, value interface{})
- func (tx *OptimisticTransaction) Rollback(ctx context.Context) error
- type ProfileSummary
- type PrometheusMetrics
- func (p *PrometheusMetrics) Gauge(name string, value float64, tags ...string)
- func (p *PrometheusMetrics) GetRegistry() *prometheus.Registry
- func (p *PrometheusMetrics) Histogram(name string, value float64, tags ...string)
- func (p *PrometheusMetrics) Increment(name string, tags ...string)
- func (p *PrometheusMetrics) Timing(name string, duration time.Duration, tags ...string)
- type Query
- func (q *Query) All(ctx context.Context, dest interface{}) error
- func (q *Query) Count(ctx context.Context) (int, error)
- func (q *Query) Each(ctx context.Context, fn func(key string, data []byte) error) error
- func (q *Query) Filter(fn func(data []byte) bool) *Query
- func (q *Query) FilterJSON(fn func(obj map[string]interface{}) bool) *Query
- func (q *Query) First(ctx context.Context, dest interface{}) error
- func (q *Query) Limit(n int) *Query
- func (q *Query) Offset(n int) *Query
- func (q *Query) Sort(fn func(a, b []byte) bool) *Query
- func (q *Query) SortByField(fieldName string, ascending bool) *Query
- type QueryBuilder
- type QueryComplexity
- type QueryProfile
- type QueryProfiler
- func (p *QueryProfiler) Clear()
- func (p *QueryProfiler) GetFallbacks() []QueryProfile
- func (p *QueryProfiler) GetFullScans() []QueryProfile
- func (p *QueryProfiler) GetProfiles() []QueryProfile
- func (p *QueryProfiler) GetSlowQueries() []QueryProfile
- func (p *QueryProfiler) GetSummary() ProfileSummary
- func (p *QueryProfiler) PrintSummary()
- func (p *QueryProfiler) Record(profile *QueryProfile)
- func (p *QueryProfiler) SetEnabled(enabled bool)
- func (p *QueryProfiler) SetSlowQueryThreshold(d time.Duration)
- func (p *QueryProfiler) StartProfile(method string) *QueryProfile
- type RedisIndexer
- func (r *RedisIndexer) Close() error
- func (r *RedisIndexer) Count(ctx context.Context, entityType, indexName, indexValue string) (int64, error)
- func (r *RedisIndexer) GetIndexStats(ctx context.Context, entityType, indexName string, indexValues []string) (map[string]int64, error)
- func (r *RedisIndexer) Query(ctx context.Context, entityType, indexName, indexValue string) ([]string, error)
- func (r *RedisIndexer) QueryMultiple(ctx context.Context, entityType, indexName string, indexValues []string) ([]string, error)
- func (r *RedisIndexer) RebuildIndex(ctx context.Context, spec *MultiIndexSpec, objects map[string][]byte) error
- func (r *RedisIndexer) RegisterMultiIndex(spec *MultiIndexSpec)
- func (r *RedisIndexer) RemoveFromIndexes(ctx context.Context, objectKey string, data []byte) error
- func (r *RedisIndexer) ReplaceIndexes(ctx context.Context, objectKey string, oldData, newData []byte) error
- func (r *RedisIndexer) UpdateIndexes(ctx context.Context, objectKey string, data []byte) error
- type RepairReport
- type RetryConfig
- type S3Backend
- func (b *S3Backend) Append(ctx context.Context, key string, data []byte) error
- func (b *S3Backend) Close() error
- func (b *S3Backend) Delete(ctx context.Context, key string) error
- func (b *S3Backend) Exists(ctx context.Context, key string) (bool, error)
- func (b *S3Backend) Get(ctx context.Context, key string) ([]byte, error)
- func (b *S3Backend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
- func (b *S3Backend) GetWithETag(ctx context.Context, key string) ([]byte, string, error)
- func (b *S3Backend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *S3Backend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
- func (b *S3Backend) Ping(ctx context.Context) error
- func (b *S3Backend) Put(ctx context.Context, key string, data []byte) error
- func (b *S3Backend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
- func (b *S3Backend) PutStream(ctx context.Context, key string, reader io.Reader, size int64) error
- type S3BackendWithRedisLock
- type StdLogger
- type Store
- func (s *Store) Backend() Backend
- func (s *Store) BatchDelete(ctx context.Context, keys []string) []BatchOperation
- func (s *Store) BatchExists(ctx context.Context, keys []string) map[string]bool
- func (s *Store) BatchGetJSON(ctx context.Context, keys []string, destType interface{}) (map[string]interface{}, error)
- func (s *Store) BatchPutJSON(ctx context.Context, items map[string]interface{}) []BatchOperation
- func (s *Store) BeginTx(ctx context.Context) *OptimisticTransaction
- func (s *Store) Close() error
- func (s *Store) Delete(ctx context.Context, key string) error
- func (s *Store) Exists(ctx context.Context, key string) (bool, error)
- func (s *Store) GetJSON(ctx context.Context, key string, dest interface{}) error
- func (s *Store) GetJSONWithETag(ctx context.Context, key string, dest interface{}) (string, error)
- func (s *Store) List(ctx context.Context, prefix string) ([]string, error)
- func (s *Store) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
- func (s *Store) MarshalObject(value interface{}) ([]byte, error)
- func (s *Store) NewBatchWriter(batchSize int) *BatchWriter
- func (s *Store) Ping(ctx context.Context) error
- func (s *Store) PutJSON(ctx context.Context, key string, value interface{}) error
- func (s *Store) PutJSONWithETag(ctx context.Context, key string, value interface{}, expectedETag string) (string, error)
- func (s *Store) Query(prefix string) *Query
- func (s *Store) SetLogger(logger Logger)
- func (s *Store) SetMetrics(metrics Metrics)
- func (s *Store) WithMigrationPolicy(policy MigrationPolicy) *Store
- func (s *Store) WithTransaction(ctx context.Context, fn func(tx *OptimisticTransaction) error) error
- type StripedLocks
- type Transaction
- type UniqueConstraint
- type ZapLogger
Constants ΒΆ
const ( // Index update retry configuration DefaultMaxRetries = 3 DefaultInitialBackoff = 100 * time.Millisecond DefaultBackoffMultiple = 2 DefaultJitterPercent = 0.5 // 50% jitter to avoid thundering herd // Batch operation configuration DefaultBatchSize = 100 DefaultListPaginatedSize = 100 // File backend configuration DefaultFilePermissions = 0644 DefaultDirPermissions = 0755 )
Configuration constants for Smarterbase operations
const ( MetricGetSuccess = "smarterbase.get.success" MetricGetError = "smarterbase.get.error" MetricGetDuration = "smarterbase.get.duration" MetricPutSuccess = "smarterbase.put.success" MetricPutError = "smarterbase.put.error" MetricPutDuration = "smarterbase.put.duration" MetricDeleteSuccess = "smarterbase.delete.success" MetricDeleteError = "smarterbase.delete.error" MetricDeleteDuration = "smarterbase.delete.duration" MetricQueryDuration = "smarterbase.query.duration" MetricQueryResults = "smarterbase.query.results" MetricIndexUpdate = "smarterbase.index.update" MetricIndexRetries = "smarterbase.index.retries" MetricIndexErrors = "smarterbase.index.errors" MetricTransactionSuccess = "smarterbase.transaction.success" MetricTransactionConflict = "smarterbase.transaction.conflict" MetricTransactionRollback = "smarterbase.transaction.rollback" MetricLockAcquired = "smarterbase.lock.acquired" MetricLockFailed = "smarterbase.lock.failed" MetricLockDuration = "smarterbase.lock.duration" MetricLockContention = "smarterbase.lock.contention" // Number of retries needed MetricLockTimeout = "smarterbase.lock.timeout" // Locks that timed out MetricLockWaitTime = "smarterbase.lock.wait_duration" // Time spent waiting for locks MetricLockActive = "smarterbase.lock.active" // Number of active locks MetricLockOrphaned = "smarterbase.lock.orphaned" // Orphaned locks detected MetricLockCleanup = "smarterbase.lock.cleanup" // Lock cleanup operations MetricLockForceRelease = "smarterbase.lock.force_release" // Forced lock releases MetricCounterIncrement = "smarterbase.counter.increment" // Counter increments MetricCounterSet = "smarterbase.counter.set" // Counter value sets MetricCounterDelete = "smarterbase.counter.delete" // Counter deletions MetricCounterError = "smarterbase.counter.error" // Counter operation errors MetricCounterAuditTotal = "smarterbase.counter.audit.total" // Total counters audited MetricCounterAuditInvalid = "smarterbase.counter.audit.invalid" // Invalid counters found MetricCounterAuditNegative = "smarterbase.counter.audit.negative" // Negative counters found MetricCounterRepair = "smarterbase.counter.repair" // Counter repairs // Additional metrics for Prometheus integration MetricBackendOps = "smarterbase.backend.ops" MetricBackendErrors = "smarterbase.backend.errors" MetricBackendLatency = "smarterbase.backend.latency" MetricIndexHits = "smarterbase.index.hits" MetricIndexMisses = "smarterbase.index.misses" MetricCacheHits = "smarterbase.cache.hits" MetricCacheMisses = "smarterbase.cache.misses" MetricTransactionSize = "smarterbase.transaction.size" MetricCacheSize = "smarterbase.cache.size" )
Common metric names
Variables ΒΆ
var ( // Data errors ErrNotFound = errors.New("object not found") ErrAlreadyExists = errors.New("object already exists") ErrConflict = errors.New("concurrent modification detected") ErrInvalidData = errors.New("invalid data format") // Backend errors ErrTimeout = errors.New("operation timed out") ErrQuotaExceeded = errors.New("storage quota exceeded") // Index errors ErrIndexCorrupted = errors.New("index corrupted, repair needed") ErrIndexRetries = errors.New("index update retries exhausted") ErrIndexMismatch = errors.New("index does not match data") // Lock errors ErrLockHeld = errors.New("lock already held by another process") ErrLockTimeout = errors.New("failed to acquire lock within timeout") ErrLockReleased = errors.New("lock was already released") ErrLockNotFound = errors.New("lock not found") ErrInvalidLockKey = errors.New("invalid lock key") // Transaction errors ErrTransactionFailed = errors.New("transaction failed") ErrRollbackFailed = errors.New("transaction rollback failed") ErrTransactionTimeout = errors.New("transaction timed out") // Configuration errors ErrInvalidConfig = errors.New("invalid configuration") )
Sentinel errors for common conditions
Functions ΒΆ
func AutoRegisterIndexes ΒΆ
func AutoRegisterIndexes( redisIndexer *RedisIndexer, entityType string, example interface{}, ) error
AutoRegisterIndexes automatically registers indexes for a struct type based on struct tags Example usage:
type User struct {
Email string `json:"email" sb:"index"`
PlatformID string `json:"platform_id" sb:"index"`
}
AutoRegisterIndexes(redisIndexer, "users", &User{})
func BatchGet ΒΆ
BatchGet retrieves multiple objects by keys with type safety. This eliminates the marshal/unmarshal anti-pattern in BatchGetJSON.
Example:
users, err := smarterbase.BatchGet[User](ctx, store, keys)
func BatchGetWithErrors ΒΆ
func BatchGetWithErrors[T any](ctx context.Context, store *Store, keys []string) ([]*T, map[string]error)
BatchGetWithErrors retrieves multiple objects and returns both results and errors. Use this when you need to know which specific keys failed.
Example:
users, errors := smarterbase.BatchGetWithErrors[User](ctx, store, keys)
for key, err := range errors {
log.Printf("Failed to get %s: %v", key, err)
}
func BatchGetWithFilter ΒΆ
func BatchGetWithFilter[T any]( ctx context.Context, store *Store, keys []string, filter func(*T) bool, ) ([]*T, error)
BatchGetWithFilter loads multiple objects in parallel and applies an optional filter. This helper simplifies the common pattern of loading multiple items and filtering them, eliminating 10-15 lines of manual iteration boilerplate.
Behavior:
- Loads all items sequentially (use BatchGet for parallel loading without filtering)
- Applies filter during loading (memory efficient - doesn't load all then filter)
- Continues on individual item errors (doesn't fail entire batch for one error)
- Returns only items that pass the filter
Use this when you need to load multiple items and only keep those matching a condition. For loading all items without filtering, use BatchGet[T] instead (more efficient).
Example with filter:
// Get only active users
activeUsers, err := smarterbase.BatchGetWithFilter[User](
ctx, store, userKeys,
func(u *User) bool { return u.Active },
)
Example without filter (equivalent to BatchGet but with error tolerance):
// Get all items, skip errors allResults, err := smarterbase.BatchGetWithFilter[Property](ctx, store, keys, nil)
Performance: Sequential loading. For better performance without filtering, use BatchGet[T] which loads in parallel.
See docs/adr/0006-collection-api.md for design rationale.
func DetectCircularCascade ΒΆ
func DetectCircularCascade(cascades map[string][]CascadeSpec) error
DetectCircularCascade detects circular cascade dependencies This is a helper function to prevent infinite loops in cascade chains
func ExtractIDFromCascadeKey ΒΆ
ExtractIDFromCascadeKey is a public helper for extracting IDs from keys This is useful when implementing custom cascade logic
func ExtractJSONField ΒΆ
func ExtractJSONField(fieldName string) func(objectKey string, data []byte) ([]IndexEntry, error)
ExtractJSONField extracts a simple field from JSON data
func ExtractJSONFieldForConstraint ΒΆ added in v2.0.3
Helper: Extract JSON field for constraint (common pattern)
func ExtractNestedJSONField ΒΆ
func ExtractNestedJSONField(fieldPath ...string) func(objectKey string, data []byte) ([]IndexEntry, error)
ExtractNestedJSONField extracts nested JSON field paths like "gallery.postcode" (e.g., "gallery.postcode")
func GetByIndex ΒΆ
func GetByIndex[T any](ctx context.Context, im *IndexManager, entityType, field, value string) (*T, error)
GetByIndex is a convenience wrapper for single-result index queries. Returns the first matching item or an error if not found.
Example:
user, err := smarterbase.GetByIndex[User](ctx, indexManager, "users", "email", "alice@example.com")
func IsConflict ΒΆ
IsConflict checks if an error is a conflict/concurrent modification error
func IsConstraintViolation ΒΆ added in v2.0.3
IsConstraintViolation checks if an error is a constraint violation
func IsNotFound ΒΆ
IsNotFound checks if an error is a "not found" error
func IsPermanent ΒΆ
IsPermanent checks if an error is permanent (not retryable)
func IsRetryable ΒΆ
IsRetryable checks if an error is safe to retry
func NewID ΒΆ
func NewID() string
NewID generates a UUIDv7 (time-ordered) identifier UUIDv7 benefits: - Sortable by creation time - Database index friendly - Distributed system friendly (no coordination needed) - Can infer creation time from ID
func NormalizeEmail ΒΆ added in v2.0.3
Helper: Normalize email addresses (lowercase, trim)
func NormalizeString ΒΆ added in v2.0.3
Helper: Normalize string (trim whitespace)
func QueryIndexTyped ΒΆ
func QueryIndexTyped[T any](ctx context.Context, im *IndexManager, entityType, field, value string) ([]*T, error)
QueryIndexTyped combines index query and batch fetch with type safety. Returns typed results directly, eliminating the marshal/unmarshal dance.
Example:
users, err := smarterbase.QueryIndexTyped[User](ctx, indexManager, "users", "email", "alice@example.com")
func QueryWithFallback ΒΆ
func QueryWithFallback[T any]( ctx context.Context, store *Store, redisIndexer *RedisIndexer, entityType string, indexField string, indexValue string, scanPrefix string, filter func(*T) bool, ) ([]*T, error)
QueryWithFallback tries a Redis index lookup first, falls back to full scan, and profiles the operation. This helper eliminates the 40-50 line boilerplate pattern seen across production codebases.
Automatic handling:
- Tries Redis index first (O(1) lookup)
- Falls back to full scan if Redis unavailable (O(n))
- Query profiling and complexity tracking
- Index usage metrics
- Graceful degradation (no errors when Redis is down)
Use this for ALL Redis-indexed queries in production code. It provides automatic resilience and observability without manual boilerplate.
Basic usage:
users, err := smarterbase.QueryWithFallback[User](
ctx, store, redisIndexer,
"users", "role", "admin", // Redis index lookup
"users/", // Fallback scan prefix
func(u *User) bool { return u.Role == "admin" }, // Fallback filter
)
Performance: O(1) when Redis available, O(n) fallback. Typical latency: 5-10ms (Redis), 50-200ms (scan).
See docs/adr/0006-collection-api.md for design rationale and examples/production-patterns/ for complete working example.
func RedisOptions ΒΆ
RedisOptions returns redis.Options populated from standard environment variables.
Environment variables read (with defaults):
- REDIS_ADDR (default: "localhost:6379")
- REDIS_PASSWORD (default: "")
- REDIS_DB (default: 0)
- REDIS_TLS_ENABLED (default: false, auto-enabled for port 25061)
This is a convenience function for production deployments following 12-factor app principles. It provides sensible defaults for local development while allowing production configuration via environment variables.
Users can still construct redis.Options manually for advanced scenarios (Redis Cluster, Sentinel, custom TLS, connection pools, etc.).
Example usage:
// Simple case - works locally and in production redisClient := redis.NewClient(smarterbase.RedisOptions()) defer redisClient.Close() // Production deployment: // export REDIS_ADDR=redis.prod.example.com:6379 // export REDIS_PASSWORD=secret // export REDIS_DB=0 // export REDIS_TLS_ENABLED=true
For more complex setups, use redis.Options directly:
redisClient := redis.NewClient(&redis.Options{
Addr: "redis.example.com:6379",
Password: "secret",
TLSConfig: &tls.Config{...},
PoolSize: 100,
})
func RedisOptionsWithOverrides ΒΆ
RedisOptionsWithOverrides returns redis.Options with explicit overrides for common parameters.
This helper is designed for applications that have explicit configuration but want environment variable fallback. Pass empty strings to use environment variables.
Parameters:
- addr: Redis server address (empty = use REDIS_ADDR env var or "localhost:6379")
- password: Redis password (empty = use REDIS_PASSWORD env var)
- poolSize: Connection pool size (0 = use Redis default of 10)
- minIdleConns: Minimum idle connections (0 = use Redis default of 0)
Example - Application config with environment fallback:
opts := smarterbase.RedisOptionsWithOverrides(
cfg.RedisHost, // Use config if present, else env var
cfg.RedisPassword, // Use config if present, else env var
10, // App-specific pool size
5, // App-specific min idle
)
redisClient := redis.NewClient(opts)
Example - Pure environment config:
opts := smarterbase.RedisOptionsWithOverrides("", "", 10, 5)
// Reads REDIS_ADDR and REDIS_PASSWORD from environment
redisClient := redis.NewClient(opts)
func RegisterIndexesForType ΒΆ
func RegisterIndexesForType(cfg IndexConfig, example interface{}, manualIndexes func()) error
RegisterIndexesForType is a convenience function that combines auto-registration with manual index registration for complex cases
func UnmarshalBatchResults ΒΆ
UnmarshalBatchResults converts BatchGetJSON results to typed objects. This is a helper for code that still uses the old BatchGetJSON API.
Example:
results, _ := store.BatchGetJSON(ctx, keys, User{})
users, err := smarterbase.UnmarshalBatchResults[User](results)
func UpdateWithIndexes ΒΆ
func UpdateWithIndexes( ctx context.Context, store *Store, redisIndexer *RedisIndexer, key string, data interface{}, updates []IndexUpdate, ) error
UpdateWithIndexes atomically updates data and all associated Redis indexes. This prevents the common bug where developers forget to update indexes after modifying indexed fields.
Behavior:
- Writes data to storage first
- Updates all specified indexes (best-effort)
- Logs warnings on index update failures (doesn't fail the operation)
- Handles add, remove, and replace operations automatically
Use this whenever updating fields that are indexed in Redis. It ensures indexes stay in sync with your data without manual coordination.
Example updating an indexed field:
// User changed their email
user.Email = newEmail
err := smarterbase.UpdateWithIndexes(
ctx, store, redisIndexer,
"users/user-123.json", user,
[]smarterbase.IndexUpdate{
{EntityType: "users", IndexField: "email", OldValue: oldEmail, NewValue: newEmail},
},
)
Error handling: Index update failures are logged but don't cause the function to return an error. This ensures application availability even when Redis is temporarily unavailable.
See docs/adr/0006-collection-api.md for design rationale.
func ValidateCascadeSpec ΒΆ
func ValidateCascadeSpec(spec CascadeSpec) error
ValidateCascadeSpec validates that a cascade spec is properly configured
func WithAtomicUpdate ΒΆ
func WithAtomicUpdate(ctx context.Context, store *Store, lock *DistributedLock, key string, ttl time.Duration, fn func(ctx context.Context) error) error
WithAtomicUpdate executes a function with distributed lock protection. This ensures that read-modify-write operations are truly atomic across all processes.
β USE THIS for critical updates that require isolation: - Financial transactions (account balance updates) - Inventory modifications - Counter increments - Any read-modify-write that must be atomic
Example:
lock := smarterbase.NewDistributedLock(redisClient, "smarterbase")
err := smarterbase.WithAtomicUpdate(ctx, store, lock, "accounts/123", 10*time.Second,
func(ctx context.Context) error {
var account Account
store.GetJSON(ctx, "accounts/123", &account)
// Safe: No other process can modify this account during this function
account.Balance += 100
store.PutJSON(ctx, "accounts/123", &account)
return nil
})
Performance: Adds 2-5ms latency for lock acquisition (no contention). Under contention: +10-50ms per retry (exponential backoff). Retries: Automatically retries 3 times with exponential backoff if lock is held. Metrics: Tracks lock contention, wait time, and timeouts via store.metrics.
func WithContext ΒΆ
WithContext adds context to an error
func WithProfiler ΒΆ
func WithProfiler(ctx context.Context, profiler *QueryProfiler) context.Context
WithProfiler attaches a profiler to the context
Types ΒΆ
type AuditOptions ΒΆ
type AuditOptions struct {
Pattern string // Redis key pattern (e.g., "counter:*")
LargeThreshold int64 // Values above this are flagged as large
CheckNegative bool // Flag negative values as warnings
CheckZero bool // Include zero-value counters in report
}
AuditOptions configures the audit process
func DefaultAuditOptions ΒΆ
func DefaultAuditOptions() *AuditOptions
DefaultAuditOptions returns sensible defaults for counter auditing
type AuditReport ΒΆ
type AuditReport struct {
Timestamp time.Time
TotalCounters int
InvalidCounters []string // Counters with non-integer values
NegativeCounters []string // Counters with negative values (usually unexpected)
LargeCounters []string // Counters with unusually large values
ZeroCounters []string // Counters with zero value
CounterValues map[string]int64
Warnings []string
}
AuditReport contains the results of a counter audit
type Backend ΒΆ
type Backend interface {
// Object operations
Get(ctx context.Context, key string) ([]byte, error)
Put(ctx context.Context, key string, data []byte) error
Delete(ctx context.Context, key string) error
Exists(ctx context.Context, key string) (bool, error)
// Conditional operations (for optimistic locking)
// Returns ETag after successful put
PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
GetWithETag(ctx context.Context, key string) (data []byte, etag string, err error)
// List operations
List(ctx context.Context, prefix string) ([]string, error)
ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
// Streaming (for large files like photos/audio)
GetStream(ctx context.Context, key string) (io.ReadCloser, error)
PutStream(ctx context.Context, key string, reader io.Reader, size int64) error
// Append operations (for JSONL event logs)
// Appends data to existing key, or creates if not exists
Append(ctx context.Context, key string, data []byte) error
// Health check
Ping(ctx context.Context) error
// Resource cleanup
Close() error
}
Backend defines the interface for different storage implementations This allows SmarterBase to work with S3, local filesystem, or any S3-compatible storage
func NewGCSBackend ΒΆ
NewGCSBackend creates a new GCS backend
func NewMinIOBackend ΒΆ
func NewMinIOBackend(cfg MinIOConfig) (Backend, error)
NewMinIOBackend creates a new MinIO backend MinIO is S3-compatible, so this wraps S3Backend with MinIO-specific configuration
func NewMinIOBackendWithRedisLock ΒΆ
func NewMinIOBackendWithRedisLock(cfg MinIOConfig, redisClient *redis.Client) (Backend, error)
NewMinIOBackendWithRedisLock creates a MinIO backend with distributed locking
type BackendConfig ΒΆ
type BackendConfig struct {
Type string // "s3", "filesystem", "minio", etc.
Bucket string // S3 bucket or base directory
Region string // AWS region (S3 only)
Endpoint string // Custom endpoint (for S3-compatible services)
PathPrefix string // Optional prefix for all keys
Options map[string]string // Backend-specific options
}
BackendConfig holds configuration for any backend
func (BackendConfig) Validate ΒΆ
func (c BackendConfig) Validate() error
Validate checks if the BackendConfig is valid
type BatchOperation ΒΆ
BatchOperation represents a batch operation result
type BatchOperationResult ΒΆ
type BatchOperationResult struct {
Total int
Successful int
Failed int
Errors []BatchOperation
}
BatchOperationResult summarizes the results of a batch operation
func AnalyzeBatchResults ΒΆ
func AnalyzeBatchResults(operations []BatchOperation) *BatchOperationResult
AnalyzeBatchResults analyzes batch operation results
type BatchWriter ΒΆ
type BatchWriter struct {
// contains filtered or unexported fields
}
BatchWriter provides a convenient interface for batching writes with automatic flushing.
Use BatchWriter when you have a stream of writes and want to automatically batch them for better performance. The writer automatically flushes when the batch size is reached.
Basic usage:
writer := store.NewBatchWriter(100) // Flush every 100 items
for _, user := range users {
key := fmt.Sprintf("users/%s", user.ID)
if err := writer.Add(ctx, key, user); err != nil {
log.Printf("Batch write failed: %v", err)
break
}
}
// Flush remaining items
if err := writer.Flush(ctx); err != nil {
log.Printf("Final flush failed: %v", err)
}
With progress tracking:
writer := store.NewBatchWriter(100)
for i, user := range users {
key := fmt.Sprintf("users/%s", user.ID)
if err := writer.Add(ctx, key, user); err != nil {
return fmt.Errorf("failed at user %d: %w", i, err)
}
if (i+1) % 1000 == 0 {
log.Printf("Processed %d/%d users", i+1, len(users))
}
}
return writer.Flush(ctx)
func (*BatchWriter) Add ΒΆ
func (bw *BatchWriter) Add(ctx context.Context, key string, value interface{}) error
Add adds an item to the batch and automatically flushes when the batch size is reached.
Returns an error if the automatic flush fails. On error, the batch is cleared and you should handle the error appropriately (retry, log, abort, etc.).
Example:
for _, item := range items {
if err := writer.Add(ctx, "items/"+item.ID, item); err != nil {
return fmt.Errorf("batch write failed: %w", err)
}
}
func (*BatchWriter) Flush ΒΆ
func (bw *BatchWriter) Flush(ctx context.Context) error
Flush writes all pending items in the batch.
You must call Flush at the end of your batch writing to ensure all items are written. Returns an error if any writes fail.
Example:
defer func() {
if err := writer.Flush(ctx); err != nil {
log.Printf("Final flush failed: %v", err)
}
}()
type CascadeDeleteFunc ΒΆ
CascadeDeleteFunc is a helper type for delete functions
type CascadeIndexManager ΒΆ
type CascadeIndexManager struct {
*IndexManager
// contains filtered or unexported fields
}
CascadeIndexManager wraps an IndexManager to add cascade delete support
func NewCascadeIndexManager ΒΆ
func NewCascadeIndexManager( base *Store, redisIndexer *RedisIndexer, ) *CascadeIndexManager
NewCascadeIndexManager creates an IndexManager with cascade support
func (*CascadeIndexManager) DeleteWithCascade ΒΆ
func (cim *CascadeIndexManager) DeleteWithCascade( ctx context.Context, parentEntityType string, key string, parentID string, ) error
DeleteWithCascade deletes an entity and all its children
func (*CascadeIndexManager) RegisterCascade ΒΆ
func (cim *CascadeIndexManager) RegisterCascade(parentEntityType string, spec CascadeSpec)
RegisterCascade registers a cascade relationship
func (*CascadeIndexManager) RegisterCascadeChain ΒΆ
func (cim *CascadeIndexManager) RegisterCascadeChain(parentEntityType string, specs []CascadeSpec)
RegisterCascadeChain registers multiple cascades for an entity
type CascadeManager ΒΆ
type CascadeManager struct {
// contains filtered or unexported fields
}
CascadeManager handles declarative cascade delete operations
func NewCascadeManager ΒΆ
func NewCascadeManager(base *Store, indexer *RedisIndexer) *CascadeManager
NewCascadeManager creates a new cascade manager
func (*CascadeManager) ExecuteCascadeDelete ΒΆ
func (cm *CascadeManager) ExecuteCascadeDelete( ctx context.Context, parentEntityType string, parentID string, parentKey string, ) error
ExecuteCascadeDelete deletes all children before deleting the parent entity Returns error if any child deletion fails (transaction-like behavior)
func (*CascadeManager) GetCascadeTree ΒΆ
func (cm *CascadeManager) GetCascadeTree() map[string][]string
GetCascadeTree returns a human-readable representation of cascade relationships Useful for debugging and documentation
func (*CascadeManager) PrintCascadeTree ΒΆ
func (cm *CascadeManager) PrintCascadeTree() string
PrintCascadeTree prints a human-readable cascade tree Useful for debugging
func (*CascadeManager) Register ΒΆ
func (cm *CascadeManager) Register(parentEntityType string, spec CascadeSpec)
Register registers a cascade delete relationship Example:
cm.Register("properties", CascadeSpec{
ChildEntityType: "areas",
ForeignKeyField: "property_id",
DeleteFunc: store.DeleteArea,
})
func (*CascadeManager) RegisterChain ΒΆ
func (cm *CascadeManager) RegisterChain(parentEntityType string, specs []CascadeSpec)
RegisterChain registers multiple cascade relationships for an entity Example:
cm.RegisterChain("properties", []CascadeSpec{
{ChildEntityType: "areas", ForeignKeyField: "property_id", DeleteFunc: store.DeleteArea},
})
type CascadeSpec ΒΆ
type CascadeSpec struct {
ChildEntityType string // e.g., "areas"
ForeignKeyField string // JSON field name in child that references parent (e.g., "property_id")
DeleteFunc func(ctx context.Context, childID string) error
}
CascadeSpec defines a parent-child relationship for cascade deletes
type CircuitBreaker ΒΆ
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker prevents cascading failures when dependencies are unavailable. Implements the circuit breaker pattern with three states: closed, open, half-open.
States:
- Closed: Normal operation, requests pass through
- Open: Dependency failing, requests fail fast without calling dependency
- Half-Open: Testing if dependency recovered, limited requests allowed
Use case: Wrap Redis operations to prevent cascading failures when Redis is down.
func NewCircuitBreaker ΒΆ
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a circuit breaker.
Parameters:
- maxFailures: Number of consecutive failures before opening circuit
- resetTimeout: Duration before transitioning from open to half-open
Example:
cb := NewCircuitBreaker(5, 30*time.Second)
err := cb.Execute(ctx, func() error {
return redisClient.Get(ctx, key).Err()
})
func (*CircuitBreaker) Execute ΒΆ
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error
Execute runs fn if circuit is closed or half-open. Returns ErrBackendUnavailable if circuit is open.
func (*CircuitBreaker) Failures ΒΆ
func (cb *CircuitBreaker) Failures() int
Failures returns the current failure count
func (*CircuitBreaker) Reset ΒΆ
func (cb *CircuitBreaker) Reset()
Reset manually resets the circuit breaker to closed state
func (*CircuitBreaker) State ΒΆ
func (cb *CircuitBreaker) State() string
State returns current circuit breaker state (closed, open, or half-open)
func (*CircuitBreaker) WithStateChangeCallback ΒΆ
func (cb *CircuitBreaker) WithStateChangeCallback(fn func(from, to string)) *CircuitBreaker
WithStateChangeCallback adds a callback for state transitions. Useful for metrics and logging.
type ConstraintManager ΒΆ added in v2.0.3
type ConstraintManager struct {
// contains filtered or unexported fields
}
ConstraintManager handles uniqueness constraints using Redis SET NX operations.
Architecture: - Uses Redis as "claim registry" for unique keys - SET NX (Set if Not eXists) provides atomic uniqueness guarantee - No race conditions possible - Redis handles concurrency - Rollback support if storage write fails after claim
Key Format: unique:{entity}:{field}:{value} β object_key Example: unique:users:email:adrian@demandops.com β users/019ab.../profile.json
func NewConstraintManager ΒΆ added in v2.0.3
func NewConstraintManager(redis *redis.Client) *ConstraintManager
NewConstraintManager creates a new constraint manager
func (*ConstraintManager) ClaimUniqueKeys ΒΆ added in v2.0.3
func (cm *ConstraintManager) ClaimUniqueKeys(ctx context.Context, entityType, objectKey string, data interface{}) ([]string, error)
ClaimUniqueKeys atomically claims all unique keys for an entity before storage write.
This is the CRITICAL operation that prevents duplicates: 1. Extract all unique field values from data 2. Use Redis SET NX to atomically claim each key 3. If ANY claim fails β rollback all and return error 4. If ALL succeed β safe to write to storage
Returns: - claimed keys (for rollback if storage write fails) - error if any constraint is violated
func (*ConstraintManager) RebuildConstraints ΒΆ added in v2.0.3
func (cm *ConstraintManager) RebuildConstraints(ctx context.Context, entityType string, objects map[string]interface{}) error
RebuildConstraints rebuilds all constraint keys from storage Useful for: - Initial setup when adding constraints to existing data - Recovery after Redis data loss - Cleanup of stale constraint keys
func (*ConstraintManager) RegisterConstraint ΒΆ added in v2.0.3
func (cm *ConstraintManager) RegisterConstraint(constraint *UniqueConstraint)
RegisterConstraint registers a uniqueness constraint for an entity type
func (*ConstraintManager) ReleaseUniqueKeys ΒΆ added in v2.0.3
func (cm *ConstraintManager) ReleaseUniqueKeys(ctx context.Context, claimedKeys []string) error
ReleaseUniqueKeys releases previously claimed keys (rollback after failed storage write)
func (*ConstraintManager) UpdateUniqueKeys ΒΆ added in v2.0.3
func (cm *ConstraintManager) UpdateUniqueKeys(ctx context.Context, entityType, objectKey string, oldData, newData interface{}) ([]string, error)
UpdateUniqueKeys handles constraint updates when an entity is modified.
Flow: 1. Release old unique keys temporarily 2. Claim new unique keys 3. If claim succeeds, clean up any released keys that weren't reclaimed 4. If claim fails, restore old keys and return error
This ensures atomicity - either all new keys claimed or old keys restored.
func (*ConstraintManager) VerifyConstraint ΒΆ added in v2.0.3
func (cm *ConstraintManager) VerifyConstraint(ctx context.Context, entityType, fieldName, value, expectedKey string) (bool, error)
VerifyConstraint checks if a constraint key exists and points to correct object Useful for detecting stale constraint keys
type ConstraintViolationError ΒΆ added in v2.0.3
type ConstraintViolationError struct {
EntityType string
FieldName string
Value string
ExistingKey string // The object key that already has this value
}
ConstraintViolationError is returned when a uniqueness constraint is violated
func (*ConstraintViolationError) Error ΒΆ added in v2.0.3
func (e *ConstraintViolationError) Error() string
type Counter ΒΆ
type Counter struct {
// contains filtered or unexported fields
}
Counter provides atomic counter operations with Redis backend. Useful for generating sequential IDs, tracking counts, etc.
func NewCounter ΒΆ
NewCounter creates a new Redis-backed atomic counter
type CounterAudit ΒΆ
type CounterAudit struct {
// contains filtered or unexported fields
}
CounterAudit provides auditing and verification of counter values
func NewCounterAudit ΒΆ
func NewCounterAudit(redis *redis.Client, logger Logger, metrics Metrics) *CounterAudit
NewCounterAudit creates a new counter audit utility
func (*CounterAudit) Audit ΒΆ
func (ca *CounterAudit) Audit(ctx context.Context, opts *AuditOptions) (*AuditReport, error)
Audit performs a comprehensive audit of counters
func (*CounterAudit) GetCounterInfo ΒΆ
func (ca *CounterAudit) GetCounterInfo(ctx context.Context, key string) (*CounterInfo, error)
GetCounterInfo retrieves detailed information about a counter
func (*CounterAudit) ListCounters ΒΆ
ListCounters lists all counters matching a pattern
func (*CounterAudit) RepairCounter ΒΆ
RepairCounter attempts to fix a counter by setting it to the suggested value
type CounterInfo ΒΆ
type CounterInfo struct {
Key string
Value int64
LastModified time.Time
MemoryUsage int64 // Memory used in bytes
TTL time.Duration
}
CounterInfo contains information about a counter
type DistributedLock ΒΆ
type DistributedLock struct {
// contains filtered or unexported fields
}
DistributedLock provides Redis-based distributed locking for coordinating operations across multiple processes/servers.
Use cases: - Filesystem backend with multiple application instances - Coordinating S3 PutIfMatch operations - Preventing concurrent modifications to the same resource
func NewDistributedLock ΒΆ
func NewDistributedLock(redis *redis.Client, keyPrefix string) *DistributedLock
NewDistributedLock creates a new distributed lock manager using Redis
func NewDistributedLockWithOwnedClient ΒΆ
func NewDistributedLockWithOwnedClient(redis *redis.Client, keyPrefix string) *DistributedLock
NewDistributedLockWithOwnedClient creates a lock manager that owns the Redis client
func (*DistributedLock) Close ΒΆ
func (dl *DistributedLock) Close() error
Close releases resources held by the distributed lock
func (*DistributedLock) Lock ΒΆ
Lock acquires a distributed lock for the given key. Returns a release function that MUST be called to release the lock.
Example:
release, err := lock.Lock(ctx, "users/123", 5*time.Second)
if err != nil {
return err
}
defer release()
// Critical section - only one process can execute this at a time
user := getUser()
user.Balance += 100
saveUser(user)
func (*DistributedLock) TryLockWithRetry ΒΆ
func (l *DistributedLock) TryLockWithRetry(ctx context.Context, key string, ttl time.Duration, maxRetries int) (func(), error)
TryLockWithRetry attempts to acquire a lock with exponential backoff retry. Useful for handling temporary contention.
type EncryptionBackend ΒΆ
type EncryptionBackend struct {
Backend
// contains filtered or unexported fields
}
EncryptionBackend wraps any backend with AES-256-GCM encryption at rest.
All data is encrypted before storage and decrypted after retrieval. Uses AES-256-GCM for authenticated encryption with random nonces.
Example:
key := make([]byte, 32) // Generate or load from secrets manager rand.Read(key) encryptedBackend := smarterbase.NewEncryptionBackend(s3Backend, key) store := smarterbase.NewStore(encryptedBackend)
func NewEncryptionBackend ΒΆ
func NewEncryptionBackend(backend Backend, key []byte) (*EncryptionBackend, error)
NewEncryptionBackend wraps a backend with AES-256-GCM encryption. Key must be exactly 32 bytes for AES-256.
func (*EncryptionBackend) GetStream ΒΆ
func (e *EncryptionBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
GetStream decrypts streaming data
func (*EncryptionBackend) GetWithETag ΒΆ
GetWithETag decrypts data and returns ETag
func (*EncryptionBackend) PutIfMatch ΒΆ
func (e *EncryptionBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch encrypts and stores with optimistic locking
type ErrorWithContext ΒΆ
ErrorWithContext adds additional context to errors for better debugging and logging
func (*ErrorWithContext) Error ΒΆ
func (e *ErrorWithContext) Error() string
func (*ErrorWithContext) Unwrap ΒΆ
func (e *ErrorWithContext) Unwrap() error
type FilesystemBackend ΒΆ
type FilesystemBackend struct {
// contains filtered or unexported fields
}
FilesystemBackend implements Backend using local filesystem
func NewFilesystemBackend ΒΆ
func NewFilesystemBackend(basePath string) *FilesystemBackend
NewFilesystemBackend creates a new filesystem backend with 32 lock stripes
func NewFilesystemBackendWithStripes ΒΆ
func NewFilesystemBackendWithStripes(basePath string, stripes int) *FilesystemBackend
NewFilesystemBackendWithStripes creates a filesystem backend with custom stripe count
func (*FilesystemBackend) Append ΒΆ
Append appends data to an existing key or creates it if it doesn't exist
func (*FilesystemBackend) Close ΒΆ
func (b *FilesystemBackend) Close() error
Close releases any resources held by the filesystem backend
func (*FilesystemBackend) Delete ΒΆ
func (b *FilesystemBackend) Delete(ctx context.Context, key string) error
Delete removes the object at the given key from the filesystem
func (*FilesystemBackend) GetStream ΒΆ
func (b *FilesystemBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
GetStream returns a reader for streaming large objects
func (*FilesystemBackend) GetWithETag ΒΆ
GetWithETag retrieves data and its ETag for optimistic locking
func (*FilesystemBackend) ListPaginated ΒΆ
func (b *FilesystemBackend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
ListPaginated streams keys with the given prefix in batches
func (*FilesystemBackend) Ping ΒΆ
func (b *FilesystemBackend) Ping(ctx context.Context) error
Ping checks if the backend is accessible and operational
func (*FilesystemBackend) PutIfMatch ΒΆ
func (b *FilesystemBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch performs a conditional put operation using optimistic locking
type FilesystemBackendWithRedisLock ΒΆ
type FilesystemBackendWithRedisLock struct {
*FilesystemBackend
// contains filtered or unexported fields
}
FilesystemBackendWithRedisLock wraps FilesystemBackend with Redis-based distributed locking for multi-instance deployments.
func NewFilesystemBackendWithRedisLock ΒΆ
func NewFilesystemBackendWithRedisLock(basePath string, redisClient *redis.Client) *FilesystemBackendWithRedisLock
NewFilesystemBackendWithRedisLock creates a filesystem backend with distributed locking
func (*FilesystemBackendWithRedisLock) Append ΒΆ
Append overrides the base implementation with distributed locking
func (*FilesystemBackendWithRedisLock) PutIfMatch ΒΆ
func (b *FilesystemBackendWithRedisLock) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch overrides the base implementation with distributed locking
type GCSBackend ΒΆ
type GCSBackend struct {
// contains filtered or unexported fields
}
GCSBackend implements Backend using Google Cloud Storage
func (*GCSBackend) Append ΒΆ
Append appends data to an existing GCS object using read-modify-write. GCS doesn't support true append operations, so this reads, combines, and writes back.
func (*GCSBackend) Close ΒΆ
func (b *GCSBackend) Close() error
Close releases any resources held by the GCS backend
func (*GCSBackend) Delete ΒΆ
func (b *GCSBackend) Delete(ctx context.Context, key string) error
Delete removes the object at the given key from GCS
func (*GCSBackend) GetStream ΒΆ
func (b *GCSBackend) GetStream(ctx context.Context, key string) (io.ReadCloser, error)
GetStream returns a reader for streaming large objects from GCS
func (*GCSBackend) GetWithETag ΒΆ
GetWithETag retrieves data and its ETag for optimistic locking from GCS
func (*GCSBackend) ListPaginated ΒΆ
func (b *GCSBackend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
ListPaginated streams keys with the given prefix in batches from GCS
func (*GCSBackend) Ping ΒΆ
func (b *GCSBackend) Ping(ctx context.Context) error
Ping checks if the GCS backend is accessible and operational
func (*GCSBackend) PutIfMatch ΒΆ
func (b *GCSBackend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch provides optimistic locking using GCS preconditions Unlike S3, GCS supports true conditional writes via generation matching!
type GCSConfig ΒΆ
type GCSConfig struct {
ProjectID string
Bucket string
CredentialsFile string // Path to service account JSON file (optional, uses ADC if empty)
}
GCSConfig contains GCS-specific configuration
type InMemoryMetrics ΒΆ
type InMemoryMetrics struct {
Counters map[string]int
Gauges map[string]float64
Histograms map[string][]float64
Timings map[string][]time.Duration
// contains filtered or unexported fields
}
InMemoryMetrics stores metrics in memory for testing (thread-safe)
func NewInMemoryMetrics ΒΆ
func NewInMemoryMetrics() *InMemoryMetrics
NewInMemoryMetrics creates an in-memory metrics collector for testing
func (*InMemoryMetrics) Gauge ΒΆ
func (m *InMemoryMetrics) Gauge(name string, value float64, tags ...string)
Gauge sets a gauge value in memory
func (*InMemoryMetrics) Histogram ΒΆ
func (m *InMemoryMetrics) Histogram(name string, value float64, tags ...string)
Histogram records a histogram value in memory
func (*InMemoryMetrics) Increment ΒΆ
func (m *InMemoryMetrics) Increment(name string, tags ...string)
Increment increments a counter in memory
type IndexConfig ΒΆ
type IndexConfig struct {
EntityType string
KeyPrefix string
RedisIndexer *RedisIndexer
}
IndexConfig represents configuration for struct-based indexing
type IndexEntry ΒΆ
type IndexEntry struct {
IndexName string // e.g., "user_id", "area_id", "postcode"
IndexValue string // e.g., "user-123", "area-456", "1234AB"
}
IndexEntry represents a single index value for an object
type IndexHealthMonitor ΒΆ
type IndexHealthMonitor struct {
// contains filtered or unexported fields
}
IndexHealthMonitor provides automated health checking and drift detection for Redis indexes.
Purpose: - Detect when Redis indexes become stale or inconsistent - Alert on drift before it causes data issues - Enable automated repair workflows - Provide visibility into index health
func NewIndexHealthMonitor ΒΆ
func NewIndexHealthMonitor(store *Store, redisIndexer *RedisIndexer) *IndexHealthMonitor
NewIndexHealthMonitor creates a new health monitor with opinionated defaults.
Default configuration: - checkInterval: 5 minutes (frequent enough to catch issues early) - sampleSize: 100 objects (good balance of accuracy vs performance) - driftThreshold: 5.0% (alert and repair if >5% drift detected) - autoRepair: true (self-healing by default - disable if you need manual control)
These defaults are production-ready and battle-tested. Override only if you have specific needs.
func (*IndexHealthMonitor) Check ΒΆ
func (ihm *IndexHealthMonitor) Check(ctx context.Context, entityType string) (*IndexHealthReport, error)
Check performs a single health check on the specified entity type If entityType is empty, checks all registered indexes
func (*IndexHealthMonitor) RepairDrift ΒΆ
func (ihm *IndexHealthMonitor) RepairDrift(ctx context.Context, report *IndexHealthReport) error
RepairDrift attempts to repair detected index drift This should be run during off-peak hours as it can be resource-intensive
func (*IndexHealthMonitor) Start ΒΆ
func (ihm *IndexHealthMonitor) Start(ctx context.Context) error
Start begins automated health checking in the background
func (*IndexHealthMonitor) Stop ΒΆ
func (ihm *IndexHealthMonitor) Stop()
Stop halts the background health checking
func (*IndexHealthMonitor) WithAutoRepair ΒΆ
func (ihm *IndexHealthMonitor) WithAutoRepair(enabled bool) *IndexHealthMonitor
WithAutoRepair configures automatic repair behavior (enabled by default). When enabled, the monitor will automatically call RepairDrift() when drift is detected above the configured threshold.
Auto-repair is ENABLED BY DEFAULT because: - Self-healing systems are more reliable - Manual repair is error-prone and slow - Drift threshold (5%) prevents false positives - Circuit breaker protects Redis from overload
Disable only if you need manual control (e.g., scheduled maintenance windows):
monitor.WithAutoRepair(false) // Disable for manual control
Resource considerations: - Repair uses Redis SADD/SREM operations (fast) - Typically completes in <1 second for 100 objects - Circuit breaker prevents cascading failures - Runs in background goroutine (non-blocking)
func (*IndexHealthMonitor) WithDriftThreshold ΒΆ
func (ihm *IndexHealthMonitor) WithDriftThreshold(threshold float64) *IndexHealthMonitor
WithDriftThreshold sets the drift percentage that triggers alerts
func (*IndexHealthMonitor) WithInterval ΒΆ
func (ihm *IndexHealthMonitor) WithInterval(interval time.Duration) *IndexHealthMonitor
WithInterval sets the health check interval
func (*IndexHealthMonitor) WithSampleSize ΒΆ
func (ihm *IndexHealthMonitor) WithSampleSize(size int) *IndexHealthMonitor
WithSampleSize sets the number of objects to sample per check
type IndexHealthReport ΒΆ
type IndexHealthReport struct {
Timestamp time.Time
EntityType string
TotalSampled int
MissingInRedis int
ExtraInRedis int
DriftPercentage float64
MissingKeys []string
ExtraKeys []string
}
IndexHealthReport contains the results of a health check
type IndexManager ΒΆ
type IndexManager struct {
// contains filtered or unexported fields
}
IndexManager coordinates updates across Redis indexes and uniqueness constraints This provides a single point of coordination to prevent forgotten index updates.
Benefits: - Automatic updates across all configured indexes - Atomic uniqueness constraints (prevents duplicates) - Consistent error handling and logging - Reduces boilerplate in domain stores
func NewIndexManager ΒΆ
func NewIndexManager(store *Store) *IndexManager
NewIndexManager creates a new index manager
func (*IndexManager) Create ΒΆ
func (im *IndexManager) Create(ctx context.Context, key string, data interface{}) error
Create stores data and updates all indexes atomically
CRITICAL: Enforces uniqueness constraints BEFORE writing to storage. If any unique field (email, platform_user_id, etc.) already exists, this will fail with ConstraintViolationError - preventing duplicates.
func (*IndexManager) Delete ΒΆ
func (im *IndexManager) Delete(ctx context.Context, key string) error
Delete removes data and cleans up all indexes and constraints
func (*IndexManager) Get ΒΆ
func (im *IndexManager) Get(ctx context.Context, key string, dest interface{}) error
Get retrieves data without index updates
func (*IndexManager) Update ΒΆ
func (im *IndexManager) Update(ctx context.Context, key string, newData interface{}) error
Update replaces data and updates all indexes
Handles uniqueness constraints atomically: 1. Claims new unique values (if changed) 2. Writes to storage 3. Releases old unique values
func (*IndexManager) WithConstraintManager ΒΆ added in v2.0.3
func (im *IndexManager) WithConstraintManager(manager *ConstraintManager) *IndexManager
WithConstraintManager adds uniqueness constraint enforcement
func (*IndexManager) WithRedisIndexer ΒΆ
func (im *IndexManager) WithRedisIndexer(indexer *RedisIndexer) *IndexManager
WithRedisIndexer adds Redis-based indexing
type IndexRepairService ΒΆ
type IndexRepairService struct {
// contains filtered or unexported fields
}
IndexRepairService provides utilities for validating and repairing indexes Works with any Backend implementation (S3, filesystem, etc.)
func NewIndexRepairService ΒΆ
func NewIndexRepairService(backend Backend) *IndexRepairService
NewIndexRepairService creates a new index repair service
func (*IndexRepairService) ValidateAndRepairIndexes ΒΆ
func (r *IndexRepairService) ValidateAndRepairIndexes( ctx context.Context, dataPrefix string, indexPrefix string, dataFilter func(key string) bool, extractFunc func(data []byte) (map[string]string, error), createIndexFunc func(ctx context.Context, itemID, parentID string) error, ) (*RepairReport, error)
ValidateAndRepairIndexes checks and repairs reverse indexes dataPrefix: prefix for data objects (e.g., "projects/") indexPrefix: prefix for index objects (e.g., "indexes/photo-") extractFunc: function to extract items from data objects
type IndexTag ΒΆ
type IndexTag struct {
Type string // "unique" or "multi"
Name string // index name (auto-generated if not provided)
Optional bool // if true, empty values don't error
}
IndexTag represents parsed struct tag for automatic indexing Usage: Field string `json:"email" sb:"index:unique,name:users-by-email"`
func ParseIndexTag ΒΆ
ParseIndexTag parses a struct tag for indexing configuration Supported formats:
- sb:"index" or sb:"index,multi" - creates Redis multi-index
- sb:"index,name:custom-name" - with custom name
- sb:"index,optional" - allows empty values
Note: "unique" indexes are no longer supported - use Redis multi-indexes only
type IndexUpdate ΒΆ
type IndexUpdate struct {
EntityType string // e.g., "users"
IndexField string // e.g., "email"
OldValue string // Old index value (to remove)
NewValue string // New index value (to add)
}
IndexUpdate represents a single index update operation for UpdateWithIndexes.
Fields:
- EntityType: The type of entity being updated (e.g., "users", "orders")
- IndexField: The field being indexed (e.g., "email", "status")
- OldValue: Previous value to remove from index (empty string if adding new)
- NewValue: New value to add to index (empty string if removing only)
type KeyBuilder ΒΆ
type KeyBuilder struct {
// Prefix is the namespace prefix (e.g., "users", "orders")
Prefix string
// Suffix is the file extension (e.g., ".json", ".jsonl")
// Optional - defaults to empty string
Suffix string
}
KeyBuilder helps construct consistent storage keys. Eliminates error-prone fmt.Sprintf calls scattered throughout code.
Example:
kb := KeyBuilder{Prefix: "users", Suffix: ".json"}
key := kb.Key(userID) // Returns "users/userID.json"
func (KeyBuilder) Key ΒΆ
func (kb KeyBuilder) Key(id string) string
Key constructs a storage key from an ID.
func (KeyBuilder) Keys ΒΆ
func (kb KeyBuilder) Keys(ids []string) []string
Keys constructs multiple storage keys from IDs.
type LockInfo ΒΆ
type LockInfo struct {
Key string // The resource key being locked
LockKey string // The Redis key for the lock
Value string // The lock value (timestamp or unique ID)
TTL time.Duration // Remaining TTL
AcquiredAt time.Time // When the lock was acquired (derived from value if timestamp)
}
LockInfo contains information about an active lock
type LockManager ΒΆ
type LockManager struct {
// contains filtered or unexported fields
}
LockManager provides utilities for managing and cleaning up distributed locks
func NewLockManager ΒΆ
func NewLockManager(redis *redis.Client, keyPrefix string, logger Logger, metrics Metrics) *LockManager
NewLockManager creates a new lock manager for administrative operations
func (*LockManager) CleanupOrphanedLocks ΒΆ
CleanupOrphanedLocks removes locks older than the specified age
Orphaned locks occur when: - Application crashes before releasing lock - Network partition during lock release - Process killed with SIGKILL
Safety: Only removes locks if their TTL is less than minTTL. This prevents removing locks that are still legitimately held.
Example:
// Clean up locks that have been held for more than 5 minutes
// (assuming default TTL is 30 seconds, anything still locked after 5min is orphaned)
removed, err := lockManager.CleanupOrphanedLocks(ctx, 5*time.Minute)
if err != nil {
return err
}
fmt.Printf("Cleaned up %d orphaned locks\n", removed)
func (*LockManager) ForceRelease ΒΆ
func (lm *LockManager) ForceRelease(ctx context.Context, resourceKey string) error
ForceRelease forcefully releases a specific lock
β οΈ USE WITH CAUTION: Only use when you're certain the lock holder has crashed
Example:
// Force release a stuck lock
err := lockManager.ForceRelease(ctx, "users/123")
if err != nil {
return fmt.Errorf("failed to force release lock: %w", err)
}
func (*LockManager) GetLockInfo ΒΆ
GetLockInfo retrieves information about a specific lock
func (*LockManager) ListLocks ΒΆ
func (lm *LockManager) ListLocks(ctx context.Context) ([]LockInfo, error)
ListLocks returns all active locks matching the key prefix
Example:
locks, err := lockManager.ListLocks(ctx)
for _, lock := range locks {
fmt.Printf("Lock: %s, TTL: %s, Age: %s\n",
lock.Key,
lock.TTL,
time.Since(lock.AcquiredAt))
}
type Logger ΒΆ
type Logger interface {
Debug(msg string, fields ...interface{})
Info(msg string, fields ...interface{})
Warn(msg string, fields ...interface{})
Error(msg string, fields ...interface{})
}
Logger provides structured logging for Smarterbase operations
type MethodStats ΒΆ
type MethodStats struct {
Count int
TotalDuration time.Duration
AverageDuration time.Duration
MaxDuration time.Duration
MinDuration time.Duration
FullScans int
Fallbacks int
}
MethodStats tracks statistics for a specific query method
type Metrics ΒΆ
type Metrics interface {
// Increment increases a counter by 1
Increment(name string, tags ...string)
// Gauge sets an absolute value
Gauge(name string, value float64, tags ...string)
// Histogram records a value distribution (latency, size, etc)
Histogram(name string, value float64, tags ...string)
// Timing records a duration
Timing(name string, duration time.Duration, tags ...string)
}
Metrics provides observability for Smarterbase operations
type MetricsExporter ΒΆ
type MetricsExporter struct {
// contains filtered or unexported fields
}
MetricsExporter exports query profiler metrics to a MetricsRecorder (e.g., Prometheus)
func NewMetricsExporter ΒΆ
func NewMetricsExporter(profiler *QueryProfiler, recorder MetricsRecorder, interval time.Duration) *MetricsExporter
NewMetricsExporter creates a new metrics exporter
func (*MetricsExporter) ExportOnce ΒΆ
func (e *MetricsExporter) ExportOnce()
ExportOnce exports metrics once (useful for testing or manual export)
func (*MetricsExporter) Start ΒΆ
func (e *MetricsExporter) Start(ctx context.Context)
Start begins exporting metrics periodically
type MetricsRecorder ΒΆ
type MetricsRecorder interface {
RecordQueryProfile(method string, complexity string, duration float64, storageOps int, resultCount int, isFullScan bool, isFallback bool, indexUsed string)
}
MetricsRecorder is an interface for recording query metrics This allows smarterbase to be decoupled from specific metrics implementations
type MigrationBuilder ΒΆ
type MigrationBuilder struct {
// contains filtered or unexported fields
}
MigrationBuilder provides a fluent API for registering migrations
func Migrate ΒΆ
func Migrate(typeName string) *MigrationBuilder
Migrate starts building a migration for a type.
Migrations enable schema evolution without downtime. When data is read from storage, it is automatically migrated if its version doesn't match the expected version in the destination struct.
RECOMMENDED: Use WithTypeSafe() for type-safe migrations with concrete types:
// Define a pure, type-safe migration function
func migrateUserV0ToV2(old UserV0) (UserV2, error) {
parts := strings.Fields(old.Name)
return UserV2{
V: 2,
FirstName: parts[0],
LastName: strings.Join(parts[1:], " "),
Email: old.Email,
}, nil
}
// Register with zero boilerplate
smarterbase.WithTypeSafe(
smarterbase.Migrate("User").From(0).To(2),
migrateUserV0ToV2,
)
Helper methods for simple transformations:
// Split a field into multiple fields
smarterbase.Migrate("User").From(0).To(1).
Split("name", " ", "first_name", "last_name")
// Add a new field with default value
smarterbase.Migrate("User").From(1).To(2).AddField("phone", "")
// Rename a field
smarterbase.Migrate("Order").From(2).To(3).
RenameField("price", "total_amount")
// Remove a deprecated field
smarterbase.Migrate("Config").From(3).To(4).
RemoveField("legacy_flag")
Migration chaining - automatically finds shortest path:
smarterbase.Migrate("Product").From(0).To(1).AddField("sku", "")
smarterbase.Migrate("Product").From(1).To(2).Split("name", " ", "brand", "product_name")
smarterbase.WithTypeSafe(smarterbase.Migrate("Product").From(2).To(3), customMigrate)
// Reading v0 data with v3 struct β automatically runs 0β1β2β3
Migration policies:
// Default: Migrate in memory only (no write-back) store := smarterbase.NewStore(backend) // Write-back policy: Gradually upgrade stored data store.WithMigrationPolicy(smarterbase.MigrateAndWrite)
The typeName parameter must match the struct's type name (not the JSON field name). For example, if you have "type UserV2 struct {...}", use "UserV2" as the typeName.
See docs/adr/0007-type-safe-migrations.md for implementation details and testing examples.
func WithTypeSafe ΒΆ
func WithTypeSafe[From any, To any](b *MigrationBuilder, migrateFn func(From) (To, error)) *MigrationBuilder
WithTypeSafe registers a type-safe migration function.
This is the RECOMMENDED way to write migrations. Instead of working with map[string]interface{}, you write a pure function that transforms concrete types. This provides full type safety, IDE autocomplete, and compile-time error checking.
Example:
// Define your migration as a pure, type-safe function
func migrateUserV0ToV2(old UserV0) (UserV2, error) {
parts := strings.Fields(old.Name)
return UserV2{
V: 2,
FirstName: parts[0],
LastName: strings.Join(parts[1:], " "),
Email: old.Email,
}, nil
}
// Register it with zero boilerplate
smarterbase.Migrate("User").From(0).To(2).
WithTypeSafe(migrateUserV0ToV2)
Benefits over Do():
- β Full type safety - no map[string]interface{}
- β Compiler catches errors at build time
- β IDE autocomplete works
- β Easy to unit test in isolation
- β Self-documenting with concrete types
- β Refactoring tools work correctly
func (*MigrationBuilder) AddField ΒΆ
func (b *MigrationBuilder) AddField(field string, defaultValue interface{}) *MigrationBuilder
AddField adds a new field with a default value.
Use this when introducing new required fields to your schema. The default value is only added if the field doesn't already exist in the data.
Examples:
// Add a phone field with empty string default
smarterbase.Migrate("User").From(0).To(1).
AddField("phone", "")
// Add an inventory count with zero default
smarterbase.Migrate("Product").From(1).To(2).
AddField("stock_count", 0)
// Add a boolean flag with false default
smarterbase.Migrate("Config").From(2).To(3).
AddField("enabled", false)
// Before: {"id": "123", "name": "Product"}
// After: {"id": "123", "name": "Product", "stock_count": 0, "_v": 2}
func (*MigrationBuilder) Do ΒΆ
func (b *MigrationBuilder) Do(fn MigrationFunc) *MigrationBuilder
Do registers a custom migration function
func (*MigrationBuilder) From ΒΆ
func (b *MigrationBuilder) From(version int) *MigrationBuilder
From sets the source version
func (*MigrationBuilder) RemoveField ΒΆ
func (b *MigrationBuilder) RemoveField(field string) *MigrationBuilder
RemoveField removes a deprecated field from the data.
Use this to clean up old fields that are no longer needed in your schema.
Examples:
// Remove a legacy flag that's no longer used
smarterbase.Migrate("Config").From(1).To(2).
RemoveField("legacy_feature_flag")
// Remove temporary migration field
smarterbase.Migrate("User").From(2).To(3).
RemoveField("migration_temp_field")
// Before: {"id": "123", "name": "User", "legacy_flag": true}
// After: {"id": "123", "name": "User", "_v": 2}
func (*MigrationBuilder) RenameField ΒΆ
func (b *MigrationBuilder) RenameField(oldName, newName string) *MigrationBuilder
RenameField renames a field while preserving its value.
Use this when you want to change a field name for clarity or consistency. The old field is removed and its value is copied to the new field name.
Examples:
// Rename price to total_amount
smarterbase.Migrate("Order").From(0).To(1).
RenameField("price", "total_amount")
// Rename created to created_at for consistency
smarterbase.Migrate("Document").From(1).To(2).
RenameField("created", "created_at")
// Before: {"id": "123", "price": 99.99}
// After: {"id": "123", "total_amount": 99.99, "_v": 1}
func (*MigrationBuilder) Split ΒΆ
func (b *MigrationBuilder) Split(sourceField, delimiter string, targetFields ...string) *MigrationBuilder
Split is a helper that splits a field by delimiter into multiple fields.
Common use case: splitting a full name into first and last names.
Example:
// Split "name" field by space into "first_name" and "last_name"
smarterbase.Migrate("User").From(0).To(1).
Split("name", " ", "first_name", "last_name")
// Before: {"name": "Alice Smith"}
// After: {"first_name": "Alice", "last_name": "Smith", "_v": 1}
If the source field contains fewer parts than target fields, remaining fields are set to empty strings. The source field is removed after splitting.
func (*MigrationBuilder) To ΒΆ
func (b *MigrationBuilder) To(version int) *MigrationBuilder
To sets the target version
type MigrationFunc ΒΆ
MigrationFunc transforms data from one version to another.
The function receives the JSON data as a map[string]interface{} and must return the transformed data. It should set the "_v" field to the target version.
Example custom migration:
smarterbase.Migrate("Product").From(1).To(2).Do(func(data map[string]interface{}) (map[string]interface{}, error) {
// Convert price to cents
if price, ok := data["price"].(float64); ok {
data["price_cents"] = int(price * 100)
delete(data, "price")
}
data["_v"] = 2
return data, nil
})
type MigrationPolicy ΒΆ
type MigrationPolicy int
MigrationPolicy defines how migrations are applied when data is read from storage.
The policy determines whether migrated data should be written back to storage or kept only in memory.
const ( // MigrateOnRead only migrates data in memory without writing back to storage (default). // // Use this policy for: // - Production environments where you want to test migrations without modifying data // - Read-heavy workloads where write-back would add unnecessary latency // - Scenarios where you want to defer data upgrades // // Example: // // store := smarterbase.NewStore(backend) // // Data is migrated when read but not written back // store.GetJSON(ctx, "users/123", &user) MigrateOnRead MigrationPolicy = iota // MigrateAndWrite migrates data and writes it back to storage with the new version. // // Use this policy for: // - Gradual data upgrades during low-traffic periods // - Ensuring all data is eventually upgraded to the latest version // - When you want to measure migration success rates before forcing upgrades // // Example: // // store := smarterbase.NewStore(backend) // store.WithMigrationPolicy(smarterbase.MigrateAndWrite) // // Data is migrated and written back to storage with updated version // store.GetJSON(ctx, "users/123", &user) // // Performance note: Write-back adds latency (~10-50ms depending on backend) // but ensures data is upgraded over time as it's accessed. MigrateAndWrite )
type MigrationRegistry ΒΆ
type MigrationRegistry struct {
// contains filtered or unexported fields
}
MigrationRegistry manages schema migrations
func (*MigrationRegistry) HasMigrations ΒΆ
func (r *MigrationRegistry) HasMigrations() bool
HasMigrations checks if any migrations are registered
func (*MigrationRegistry) Register ΒΆ
func (r *MigrationRegistry) Register(typeName string, fromVersion, toVersion int, fn MigrationFunc)
Register adds a migration to the registry
type MinIOConfig ΒΆ
type MinIOConfig struct {
Endpoint string // e.g., "localhost:9000" or "minio.example.com"
AccessKeyID string
SecretAccessKey string
UseSSL bool // Whether to use HTTPS (default: false for localhost)
Bucket string
}
MinIOConfig contains MinIO-specific configuration
type MultiIndexSpec ΒΆ
type MultiIndexSpec struct {
Name string // e.g., "sessions-by-user-id"
EntityType string // e.g., "sessions" (for key namespacing)
ExtractFunc func(objectKey string, data []byte) ([]IndexEntry, error) // Extract index values from object
TTL time.Duration // Optional TTL for index keys (0 = no expiry)
}
MultiIndexSpec defines a multi-value secondary index
type NoOpLogger ΒΆ
type NoOpLogger struct{}
NoOpLogger is a logger that does nothing
func (*NoOpLogger) Debug ΒΆ
func (l *NoOpLogger) Debug(msg string, fields ...interface{})
Debug logs a debug message (no-op implementation)
func (*NoOpLogger) Error ΒΆ
func (l *NoOpLogger) Error(msg string, fields ...interface{})
func (*NoOpLogger) Info ΒΆ
func (l *NoOpLogger) Info(msg string, fields ...interface{})
Info logs an info message (no-op implementation)
func (*NoOpLogger) Warn ΒΆ
func (l *NoOpLogger) Warn(msg string, fields ...interface{})
Warn logs a warning message (no-op implementation)
type NoOpMetrics ΒΆ
type NoOpMetrics struct{}
NoOpMetrics is a metrics collector that does nothing
func (*NoOpMetrics) Gauge ΒΆ
func (m *NoOpMetrics) Gauge(name string, value float64, tags ...string)
Gauge sets a gauge value (no-op implementation)
func (*NoOpMetrics) Histogram ΒΆ
func (m *NoOpMetrics) Histogram(name string, value float64, tags ...string)
Histogram records a histogram value (no-op implementation)
func (*NoOpMetrics) Increment ΒΆ
func (m *NoOpMetrics) Increment(name string, tags ...string)
Increment increments a counter (no-op implementation)
type OptimisticTransaction ΒΆ
type OptimisticTransaction struct {
// contains filtered or unexported fields
}
OptimisticTransaction provides best-effort transactional semantics using optimistic locking.
β οΈ IMPORTANT LIMITATIONS: - This is NOT true ACID transactions - Uses optimistic locking with best-effort rollback - Rollback may fail, leaving partial updates - Race conditions possible on non-tracked keys
When to use: - Low-contention scenarios where conflicts are rare - Non-critical data where eventual consistency is acceptable - Coordinating updates across multiple objects
When NOT to use: - High-contention scenarios (use Redis locks or DynamoDB transactions) - Financial transactions or critical data requiring strict consistency - Operations that must be atomic across distributed systems
For true ACID transactions, consider: - DynamoDB Transactions (TransactWriteItems) - Redis-based distributed locks - Application-level saga pattern with compensation
func (*OptimisticTransaction) Commit ΒΆ
func (tx *OptimisticTransaction) Commit(ctx context.Context) error
Commit attempts to commit all operations using optimistic locking. If any operation fails, attempts to rollback (best effort).
Returns an error if: - Any ETag check fails (concurrent modification detected) - Any write/delete operation fails - Rollback fails (data may be in inconsistent state)
func (*OptimisticTransaction) Delete ΒΆ
func (tx *OptimisticTransaction) Delete(key string)
Delete queues a delete operation
func (*OptimisticTransaction) Get ΒΆ
func (tx *OptimisticTransaction) Get(ctx context.Context, key string, dest interface{}) error
Get retrieves a value and tracks its ETag for optimistic locking
func (*OptimisticTransaction) Put ΒΆ
func (tx *OptimisticTransaction) Put(key string, value interface{})
Put queues a write operation
type ProfileSummary ΒΆ
type ProfileSummary struct {
TotalQueries int
SlowQueries int
FullScans int
Fallbacks int
AverageDuration time.Duration
P50Duration time.Duration
P95Duration time.Duration
P99Duration time.Duration
ByMethod map[string]MethodStats
ByComplexity map[QueryComplexity]int
}
ProfileSummary returns a summary of query performance metrics ProfileSummary provides an aggregated view of query performance metrics
type PrometheusMetrics ΒΆ
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements the Metrics interface using Prometheus
func NewPrometheusMetrics ΒΆ
func NewPrometheusMetrics(registry *prometheus.Registry) *PrometheusMetrics
NewPrometheusMetrics creates a new Prometheus metrics instance If registry is nil, uses the default Prometheus registry
func (*PrometheusMetrics) Gauge ΒΆ
func (p *PrometheusMetrics) Gauge(name string, value float64, tags ...string)
Gauge sets a Prometheus gauge value
func (*PrometheusMetrics) GetRegistry ΒΆ
func (p *PrometheusMetrics) GetRegistry() *prometheus.Registry
GetRegistry returns the underlying Prometheus registry
func (*PrometheusMetrics) Histogram ΒΆ
func (p *PrometheusMetrics) Histogram(name string, value float64, tags ...string)
Histogram records a value in a Prometheus histogram
func (*PrometheusMetrics) Increment ΒΆ
func (p *PrometheusMetrics) Increment(name string, tags ...string)
Increment increments a Prometheus counter
type Query ΒΆ
type Query struct {
// contains filtered or unexported fields
}
Query provides a fluent interface for querying objects in Smarterbase
func (*Query) All ΒΆ
All executes the query and unmarshals all matching objects into dest dest should be a pointer to a slice of the appropriate type
func (*Query) Filter ΒΆ
Filter adds a filter function to the query The filter receives raw JSON bytes and should return true if the object matches
func (*Query) FilterJSON ΒΆ
FilterJSON adds a filter function that works with unmarshaled objects This is a convenience wrapper around Filter
type QueryBuilder ΒΆ
type QueryBuilder struct {
// contains filtered or unexported fields
}
QueryBuilder provides common query patterns
func NewQueryBuilder ΒΆ
func NewQueryBuilder(store *Store) *QueryBuilder
NewQueryBuilder creates a new query builder
func (*QueryBuilder) CreatedAfter ΒΆ
func (qb *QueryBuilder) CreatedAfter(prefix string, after time.Time) *Query
CreatedAfter finds all objects with a created_at field after the given time
func (*QueryBuilder) FieldContains ΒΆ
func (qb *QueryBuilder) FieldContains(prefix, fieldName, substring string) *Query
FieldContains finds all objects where a string field contains a substring
func (*QueryBuilder) FieldEquals ΒΆ
func (qb *QueryBuilder) FieldEquals(prefix, fieldName string, value interface{}) *Query
FieldEquals finds all objects where a field equals a value
type QueryComplexity ΒΆ
type QueryComplexity string
QueryComplexity represents the time complexity of a query
const ( ComplexityO1 QueryComplexity = "O(1)" // ComplexityO1 represents Redis index lookup ComplexityOLogN QueryComplexity = "O(log N)" // ComplexityOLogN represents binary search operations ComplexityON QueryComplexity = "O(N)" // ComplexityON represents full table scan ComplexityONM QueryComplexity = "O(N*M)" // ComplexityONM represents nested loop operations ComplexityONLogN QueryComplexity = "O(N log N)" // ComplexityONLogN represents sorting operations )
Query complexity constants for profiling and optimization
type QueryProfile ΒΆ
type QueryProfile struct {
Method string // "ListUserSessions", "GetVisionCardsByPostcode"
StartTime time.Time
Duration time.Duration
Complexity QueryComplexity // O(1), O(N), O(N*M)
IndexUsed string // "redis:sessions-by-user-id" or "none:full-scan"
ResultCount int
FilterFields []string // ["user_id", "status"]
FallbackPath bool // Did we fall back from index to scan?
StorageOps int // Number of backend Get/List operations
Error error // Any error that occurred
}
QueryProfile tracks execution details for a single query
type QueryProfiler ΒΆ
type QueryProfiler struct {
// contains filtered or unexported fields
}
QueryProfiler collects and reports query performance
func GetProfilerFromContext ΒΆ
func GetProfilerFromContext(ctx context.Context) *QueryProfiler
GetProfilerFromContext retrieves the profiler from context
func NewQueryProfiler ΒΆ
func NewQueryProfiler() *QueryProfiler
NewQueryProfiler creates a new query profiler
func (*QueryProfiler) GetFallbacks ΒΆ
func (p *QueryProfiler) GetFallbacks() []QueryProfile
GetFallbacks returns queries that fell back to full scans
func (*QueryProfiler) GetFullScans ΒΆ
func (p *QueryProfiler) GetFullScans() []QueryProfile
GetFullScans returns queries that performed full scans
func (*QueryProfiler) GetProfiles ΒΆ
func (p *QueryProfiler) GetProfiles() []QueryProfile
GetProfiles returns all recorded profiles
func (*QueryProfiler) GetSlowQueries ΒΆ
func (p *QueryProfiler) GetSlowQueries() []QueryProfile
GetSlowQueries returns queries that exceeded the slow query threshold
func (*QueryProfiler) GetSummary ΒΆ
func (p *QueryProfiler) GetSummary() ProfileSummary
GetSummary returns a statistical summary of all profiles
func (*QueryProfiler) PrintSummary ΒΆ
func (p *QueryProfiler) PrintSummary()
PrintSummary prints a formatted summary to stdout
func (*QueryProfiler) Record ΒΆ
func (p *QueryProfiler) Record(profile *QueryProfile)
Record records a completed query profile
func (*QueryProfiler) SetEnabled ΒΆ
func (p *QueryProfiler) SetEnabled(enabled bool)
SetEnabled enables or disables profiling
func (*QueryProfiler) SetSlowQueryThreshold ΒΆ
func (p *QueryProfiler) SetSlowQueryThreshold(d time.Duration)
SetSlowQueryThreshold sets the duration threshold for slow queries
func (*QueryProfiler) StartProfile ΒΆ
func (p *QueryProfiler) StartProfile(method string) *QueryProfile
StartProfile begins profiling a query
type RedisIndexer ΒΆ
type RedisIndexer struct {
// contains filtered or unexported fields
}
RedisIndexer provides fast multi-value secondary indexes using Redis Sets.
Purpose: Enables O(1) lookups for non-unique indexes like: - user_id β [session1, session2, ...] - postcode β [vision_card1, vision_card2, ...] - area_id β [photo1, photo2, ...]
Performance: Prevents expensive O(N) scans of all objects in S3/filesystem.
Architecture: - File-based Indexer: Unique 1:1 mappings (email β user) - RedisIndexer: Multi-value 1:N mappings (user_id β sessions)
Circuit Breaker: Prevents cascading failures when Redis is unavailable. After 5 consecutive failures, operations fail fast for 30 seconds.
func NewRedisIndexer ΒΆ
func NewRedisIndexer(redis *redis.Client) *RedisIndexer
NewRedisIndexer creates a new Redis-backed indexer with circuit breaker protection. Circuit breaker opens after 5 consecutive failures and retries after 30 seconds.
func NewRedisIndexerWithOwnedClient ΒΆ
func NewRedisIndexerWithOwnedClient(redis *redis.Client) *RedisIndexer
NewRedisIndexerWithOwnedClient creates a new Redis indexer that owns the client. The client will be closed when Close() is called.
func (*RedisIndexer) Close ΒΆ
func (r *RedisIndexer) Close() error
Close releases resources held by the indexer If the indexer owns the Redis client, it will be closed
func (*RedisIndexer) Count ΒΆ
func (r *RedisIndexer) Count(ctx context.Context, entityType, indexName, indexValue string) (int64, error)
Count returns the number of objects matching an index value
func (*RedisIndexer) GetIndexStats ΒΆ
func (r *RedisIndexer) GetIndexStats(ctx context.Context, entityType, indexName string, indexValues []string) (map[string]int64, error)
GetIndexStats returns statistics about an index
func (*RedisIndexer) Query ΒΆ
func (r *RedisIndexer) Query(ctx context.Context, entityType, indexName, indexValue string) ([]string, error)
Query returns all object keys matching an index value
Example: Query(ctx, "user_id", "user-123") β ["sessions/abc.json", "sessions/def.json"]
func (*RedisIndexer) QueryMultiple ΒΆ
func (r *RedisIndexer) QueryMultiple(ctx context.Context, entityType, indexName string, indexValues []string) ([]string, error)
QueryMultiple returns object keys matching ANY of the provided values (OR query)
Example: QueryMultiple(ctx, "properties", "user_id", []string{"user-1", "user-2"})
func (*RedisIndexer) RebuildIndex ΒΆ
func (r *RedisIndexer) RebuildIndex(ctx context.Context, spec *MultiIndexSpec, objects map[string][]byte) error
RebuildIndex rebuilds a secondary index from scratch
Useful for: - Initial data migration - Index repair after corruption - Adding new indexes to existing data
func (*RedisIndexer) RegisterMultiIndex ΒΆ
func (r *RedisIndexer) RegisterMultiIndex(spec *MultiIndexSpec)
RegisterMultiIndex registers a multi-value index specification
func (*RedisIndexer) RemoveFromIndexes ΒΆ
RemoveFromIndexes removes an object from all indexes
Call this before Delete() operations:
redisIndexer.RemoveFromIndexes(ctx, key, oldData) store.Delete(ctx, key)
func (*RedisIndexer) ReplaceIndexes ΒΆ
func (r *RedisIndexer) ReplaceIndexes(ctx context.Context, objectKey string, oldData, newData []byte) error
ReplaceIndexes atomically updates indexes when an object is modified
This removes the object from old index values and adds it to new ones. Call this for Update() operations:
oldData, _ := store.Backend().Get(ctx, key) store.PutJSON(ctx, key, newObject) redisIndexer.ReplaceIndexes(ctx, key, oldData, newData)
If oldData is nil/empty, behaves like UpdateIndexes (create case)
func (*RedisIndexer) UpdateIndexes ΒΆ
UpdateIndexes updates all registered multi-value indexes for an object
Call this after Put() operations:
store.PutJSON(ctx, key, session) redisIndexer.UpdateIndexes(ctx, key, data)
type RepairReport ΒΆ
type RepairReport struct {
IndexType string
Validated int
Repaired int
Errors []string
MissingIndexes []string
OrphanedIndexes []string
}
RepairReport contains results from an index repair operation
type RetryConfig ΒΆ
type RetryConfig struct {
MaxRetries int
InitialBackoff time.Duration
BackoffMultiple int
JitterPercent float64
}
RetryConfig holds configuration for retry operations with exponential backoff
func DefaultRetryConfig ΒΆ
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns the default retry configuration
func (RetryConfig) Validate ΒΆ
func (c RetryConfig) Validate() error
Validate checks if the RetryConfig is valid
type S3Backend ΒΆ
type S3Backend struct {
// contains filtered or unexported fields
}
S3Backend implements Backend using AWS S3 (or S3-compatible storage)
func (*S3Backend) Append ΒΆ
Append appends data to an existing S3 object using read-modify-write.
β οΈ WARNING: This is NOT atomic. There's a race window between Get and Put. For high-concurrency append scenarios, consider: - Using DynamoDB for coordination - S3 Transfer Acceleration with versioning - Application-level locking (Redis)
For append-only logs (JSONL), race conditions are acceptable if: - Events have unique IDs (deduplication downstream) - Lost appends can be replayed from source (Redis Streams)
func (*S3Backend) GetWithETag ΒΆ
GetWithETag retrieves data and its ETag for optimistic locking from S3
func (*S3Backend) ListPaginated ΒΆ
func (b *S3Backend) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
ListPaginated streams keys with the given prefix in batches from S3
func (*S3Backend) PutIfMatch ΒΆ
func (b *S3Backend) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch provides best-effort optimistic locking for S3.
β οΈ CRITICAL RACE CONDITION WARNING β οΈ
This implementation has an unavoidable race window between HeadObject and PutObject that can lead to lost updates in concurrent scenarios.
Race condition timeline:
T1: Thread A calls HeadObject, gets ETag "abc" β T2: Thread B calls PutObject, writes new data (ETag becomes "xyz") T3: Thread A calls PutObject with expectedETag="abc" β SUCCEEDS (should fail!) Result: Thread B's update is lost!
Root cause: S3 PutObject doesn't support If-Match headers (only GetObject does)
β DO NOT USE for: - Financial data (balances, transactions, payments) - Counters or sequences that must be accurate - Any data where lost updates are unacceptable - High-concurrency scenarios (>1 update/sec per key)
β Safe to use for: - Low-traffic scenarios (<1 update/min per key) - Data where occasional inconsistency is acceptable - Non-critical metadata or cache invalidation
β Better alternatives: - DynamoDB with conditional writes (true atomic compare-and-swap) - Redis with WATCH/MULTI/EXEC (atomic transactions) - Application-level distributed locks (Redis, etcd, Consul) - Event sourcing with append-only logs (no overwrites)
Example of proper usage (DynamoDB):
UpdateItemInput{
ConditionExpression: "version = :expectedVersion",
UpdateExpression: "SET #data = :data, version = version + 1",
}
If you must use S3 for this, consider implementing application-level locking with Redis or adding a coordination layer with DynamoDB.
type S3BackendWithRedisLock ΒΆ
type S3BackendWithRedisLock struct {
*S3Backend
// contains filtered or unexported fields
}
S3BackendWithRedisLock wraps S3Backend with distributed locking to eliminate the race condition in PutIfMatch operations.
Race condition eliminated:
T1: Thread A acquires lock for key T2: Thread A: HeadObject (get ETag) T3: Thread A: PutObject (write) T4: Thread A releases lock β No other thread can modify the object while A holds the lock
Use this for:
- Critical data requiring strong consistency (financial, counters)
- High-concurrency scenarios
- Multi-instance deployments
Note: Requires Redis for distributed locking
func NewS3BackendWithRedisLock ΒΆ
func NewS3BackendWithRedisLock(client *s3.Client, bucket string, redisClient *redis.Client) *S3BackendWithRedisLock
NewS3BackendWithRedisLock creates an S3 backend with distributed locking
func NewS3BackendWithRedisLockCustom ΒΆ
func NewS3BackendWithRedisLockCustom( client *s3.Client, bucket string, redisClient *redis.Client, lockTTL time.Duration, maxRetries int, ) *S3BackendWithRedisLock
NewS3BackendWithRedisLockCustom creates an S3 backend with custom lock settings
func (*S3BackendWithRedisLock) Append ΒΆ
Append overrides the base implementation with distributed locking This ensures atomic append operations across multiple processes.
func (*S3BackendWithRedisLock) Close ΒΆ
func (b *S3BackendWithRedisLock) Close() error
Close releases resources held by the backend
func (*S3BackendWithRedisLock) PutIfMatch ΒΆ
func (b *S3BackendWithRedisLock) PutIfMatch(ctx context.Context, key string, data []byte, expectedETag string) (string, error)
PutIfMatch overrides the base implementation with distributed locking This eliminates the race condition present in the base S3Backend implementation.
type StdLogger ΒΆ
type StdLogger struct {
// contains filtered or unexported fields
}
StdLogger uses standard library log package This is a simple implementation for development
func NewStdLogger ΒΆ
NewStdLogger creates a logger that writes to standard output
type Store ΒΆ
type Store struct {
// contains filtered or unexported fields
}
Store provides high-level operations on top of a Backend Completely domain-agnostic - works with any JSON-serializable types
func NewStoreWithLogger ΒΆ
NewStoreWithLogger creates a new store with a custom logger
func NewStoreWithObservability ΒΆ
NewStoreWithObservability creates a new store with logging and metrics
func (*Store) Backend ΒΆ
Backend returns the underlying backend (for advanced use cases like index repair)
func (*Store) BatchDelete ΒΆ
func (s *Store) BatchDelete(ctx context.Context, keys []string) []BatchOperation
BatchDelete deletes multiple objects in parallel for improved performance.
This method executes all deletions concurrently, making it significantly faster than sequential Delete calls when removing multiple objects.
Basic usage:
keys := []string{"users/1", "users/2", "users/3"}
results := store.BatchDelete(ctx, keys)
Error handling:
results := store.BatchDelete(ctx, keys)
for _, result := range results {
if result.Error != nil {
log.Printf("Failed to delete %s: %v", result.Key, result.Error)
}
}
Analyzing results:
results := store.BatchDelete(ctx, keys)
analysis := smarterbase.AnalyzeBatchResults(results)
if analysis.Failed > 0 {
log.Printf("Deletion failed for %d/%d keys", analysis.Failed, analysis.Total)
}
Returns a slice of BatchOperation results (one per key), containing any errors that occurred.
func (*Store) BatchExists ΒΆ
BatchExists checks if multiple keys exist in parallel.
This method executes all existence checks concurrently, making it significantly faster than sequential Exists calls when checking many keys.
Basic usage:
keys := []string{"users/1", "users/2", "users/3"}
results := store.BatchExists(ctx, keys)
for key, exists := range results {
if exists {
fmt.Printf("%s exists\n", key)
}
}
Filtering existing keys:
results := store.BatchExists(ctx, keys)
existingKeys := make([]string, 0)
for _, key := range keys {
if results[key] {
existingKeys = append(existingKeys, key)
}
}
Returns a map of key -> boolean indicating whether each key exists. If an error occurs checking a key, it's treated as not existing (false).
func (*Store) BatchGetJSON ΒΆ
func (s *Store) BatchGetJSON(ctx context.Context, keys []string, destType interface{}) (map[string]interface{}, error)
BatchGetJSON retrieves multiple JSON objects in parallel.
Important: This is the older batch API. Prefer using BatchGet[T]() from helpers.go for type-safe batch reads with automatic unmarshaling and better error handling:
users, err := smarterbase.BatchGet[User](ctx, store, keys)
This method returns a map of successfully fetched objects. Failed retrievals are silently skipped - check the return map for missing keys to detect failures.
Basic usage:
keys := []string{"users/1", "users/2", "users/3"}
results, err := store.BatchGetJSON(ctx, keys, nil)
for key, value := range results {
// Process value
}
Detecting missing keys:
results, err := store.BatchGetJSON(ctx, keys, nil)
for _, key := range keys {
if _, found := results[key]; !found {
log.Printf("Key %s was not found or failed to fetch", key)
}
}
Returns a map of key -> value for successful retrievals. Failed retrievals are omitted.
func (*Store) BatchPutJSON ΒΆ
func (s *Store) BatchPutJSON(ctx context.Context, items map[string]interface{}) []BatchOperation
BatchPutJSON stores multiple JSON objects in parallel for improved performance.
This method executes all writes concurrently, making it significantly faster than sequential PutJSON calls when writing multiple objects.
Basic usage:
items := map[string]interface{}{
"users/1": &User{ID: "1", Email: "alice@example.com"},
"users/2": &User{ID: "2", Email: "bob@example.com"},
"users/3": &User{ID: "3", Email: "carol@example.com"},
}
results := store.BatchPutJSON(ctx, items)
Error handling:
results := store.BatchPutJSON(ctx, items)
for _, result := range results {
if result.Error != nil {
log.Printf("Failed to write %s: %v", result.Key, result.Error)
}
}
Analyzing results:
results := store.BatchPutJSON(ctx, items)
analysis := smarterbase.AnalyzeBatchResults(results)
fmt.Printf("Success: %d/%d, Failed: %d\n",
analysis.Successful, analysis.Total, analysis.Failed)
Note: This is the older batch API. Consider using smarterbase.BatchGet[T]() from helpers.go for type-safe batch reads with automatic unmarshaling.
Returns a slice of BatchOperation results (one per key), containing any errors that occurred.
func (*Store) BeginTx ΒΆ
func (s *Store) BeginTx(ctx context.Context) *OptimisticTransaction
BeginTx creates a new optimistic transaction
func (*Store) GetJSON ΒΆ
GetJSON fetches and unmarshals a JSON object from storage, applying migrations if needed.
This is the primary method for reading data from smarterbase. It automatically handles schema migrations when the stored data version doesn't match the expected version.
Basic usage:
var user User
err := store.GetJSON(ctx, "users/123.json", &user)
if smarterbase.IsNotFound(err) {
// User doesn't exist
}
With schema versioning:
type User struct {
V int `json:"_v"`
ID string `json:"id"`
FirstName string `json:"first_name"`
}
// Register migration
smarterbase.Migrate("User").From(0).To(1).
Split("name", " ", "first_name", "last_name")
// Old data (v0) is automatically migrated to v1
var user User
user.V = 1 // Expected version
store.GetJSON(ctx, "users/old-user.json", &user)
Migration behavior depends on the store's migration policy:
- MigrateOnRead (default): Migrates data in memory only
- MigrateAndWrite: Migrates data and writes it back to storage
Error handling:
err := store.GetJSON(ctx, key, &user)
if smarterbase.IsNotFound(err) {
// Key doesn't exist in storage
} else if err != nil {
// Other error (network, permissions, migration failure, etc.)
}
Performance: ~50ns overhead when no migrations are registered. Migration adds 2-5ms per version step.
func (*Store) GetJSONWithETag ΒΆ
GetJSONWithETag fetches JSON and returns its ETag for optimistic locking, applying migrations if needed.
Use this method when you need to implement optimistic concurrency control. The returned ETag can be passed to PutJSONWithETag to ensure the data hasn't changed between read and write.
Basic usage:
var user User
etag, err := store.GetJSONWithETag(ctx, "users/123", &user)
if err != nil {
return err
}
// Modify user
user.LoginCount++
// Write with ETag check
_, err = store.PutJSONWithETag(ctx, "users/123", &user, etag)
With migrations and MigrateAndWrite policy:
store.WithMigrationPolicy(smarterbase.MigrateAndWrite) var user User user.V = 2 // Expected version etag, err := store.GetJSONWithETag(ctx, "users/old-user", &user) // Note: If data was migrated and written back, the returned ETag is now stale // You should refetch if you need the current ETag
ETag behavior with migrations:
- If no migration needed: Returns current ETag
- If migration happens in-memory only: Returns ETag of original data
- If MigrateAndWrite policy: ETag becomes stale after write-back (refetch recommended)
Returns the ETag string and unmarshaled data in dest, or error if read/migration fails.
func (*Store) ListPaginated ΒΆ
func (s *Store) ListPaginated(ctx context.Context, prefix string, handler func(keys []string) error) error
ListPaginated processes keys in batches
func (*Store) MarshalObject ΒΆ
MarshalObject marshals an object to JSON (utility function) Renamed from MarshalJSON to avoid conflict with json.Marshaler interface
func (*Store) NewBatchWriter ΒΆ
func (s *Store) NewBatchWriter(batchSize int) *BatchWriter
NewBatchWriter creates a new batch writer
func (*Store) PutJSON ΒΆ
PutJSON marshals and stores a JSON object to storage.
This is the primary method for writing data to smarterbase. It marshals the value to JSON and stores it at the specified key.
Basic usage:
user := &User{
ID: smarterbase.NewID(),
Email: "alice@example.com",
Name: "Alice",
}
err := store.PutJSON(ctx, "users/"+user.ID, user)
With schema versioning:
user := &User{
V: 1, // Current version
ID: smarterbase.NewID(),
FirstName: "Alice",
LastName: "Smith",
}
err := store.PutJSON(ctx, "users/"+user.ID, user)
Important notes:
- PutJSON overwrites existing data unconditionally (no ETag check)
- For conditional updates, use PutJSONWithETag instead
- For race-free updates, use WithAtomicUpdate with distributed locks
Error handling:
err := store.PutJSON(ctx, key, user)
if err != nil {
// Error could be: marshaling failure, network error, permissions, etc.
}
func (*Store) PutJSONWithETag ΒΆ
func (s *Store) PutJSONWithETag(ctx context.Context, key string, value interface{}, expectedETag string) (string, error)
PutJSONWithETag stores JSON with optimistic locking using ETag validation.
This method provides optimistic concurrency control. It only writes the data if the current ETag matches expectedETag, preventing lost updates from concurrent modifications.
Basic usage pattern (read-modify-write):
// 1. Read with ETag
var user User
etag, err := store.GetJSONWithETag(ctx, "users/123", &user)
// 2. Modify
user.Name = "Alice Smith"
// 3. Write with ETag check
newETag, err := store.PutJSONWithETag(ctx, "users/123", &user, etag)
if smarterbase.IsConflict(err) {
// Someone else modified the user between read and write
// Retry the operation
}
Common pattern with retry:
config := smarterbase.DefaultRetryConfig()
for i := 0; i < config.MaxRetries; i++ {
var user User
etag, err := store.GetJSONWithETag(ctx, key, &user)
if err != nil {
return err
}
user.Balance += 100
_, err = store.PutJSONWithETag(ctx, key, &user, etag)
if err == nil {
return nil // Success
}
if !smarterbase.IsConflict(err) {
return err // Permanent error
}
// ETag conflict - retry
}
Important notes:
- For critical operations (financial transactions), use WithAtomicUpdate with distributed locks
- PutJSONWithETag provides optimistic locking but NOT true isolation
- Always use S3BackendWithRedisLock for production multi-writer scenarios
Returns the new ETag on success, or error if write fails or ETag doesn't match.
func (*Store) SetMetrics ΒΆ
SetMetrics updates the metrics collector for this store
func (*Store) WithMigrationPolicy ΒΆ
func (s *Store) WithMigrationPolicy(policy MigrationPolicy) *Store
WithMigrationPolicy sets the migration policy for this store
func (*Store) WithTransaction ΒΆ
func (s *Store) WithTransaction(ctx context.Context, fn func(tx *OptimisticTransaction) error) error
WithTransaction executes a function within an optimistic transaction. Automatically commits on success, rolls back on error.
β οΈ WARNING: This does NOT provide isolation guarantees! Another process can modify data between your Get() and Put() calls.
β DO NOT USE for critical updates like: - Financial transactions (account balances, payments) - Inventory updates - Counter increments - Any operation where race conditions would cause data corruption
β USE distributed locks instead for critical updates:
lock := smarterbase.NewDistributedLock(redisClient, "smarterbase")
err := smarterbase.WithAtomicUpdate(ctx, store, lock, "accounts/123", 10*time.Second,
func(ctx context.Context) error {
var account Account
store.GetJSON(ctx, "accounts/123", &account)
account.Balance += 100 // Safe: protected by distributed lock
store.PutJSON(ctx, "accounts/123", &account)
return nil
})
Example (optimistic transaction - use only for low-contention scenarios):
err := store.WithTransaction(ctx, func(tx *OptimisticTransaction) error {
// Read with optimistic lock
var user User
if err := tx.Get(ctx, "users/123", &user); err != nil {
return err
}
// β οΈ CAUTION: Another process could modify user here!
user.LastSeen = time.Now()
// Queue write (will check ETag on commit)
tx.Put("users/123", user)
return nil
})
type StripedLocks ΒΆ
type StripedLocks struct {
// contains filtered or unexported fields
}
StripedLocks provides fine-grained locking using multiple mutexes to reduce contention compared to a single global mutex.
How it works: - Hash the key to determine which stripe (mutex) to use - Multiple keys hash to different stripes β concurrent operations - Same key always hashes to same stripe β consistency
Performance: - 32 stripes = ~32x better concurrency than single mutex - Negligible memory overhead (~256 bytes)
func NewStripedLocks ΒΆ
func NewStripedLocks(stripeCount int) *StripedLocks
NewStripedLocks creates a new striped lock with the specified number of stripes. Recommended: 32 for most use cases, 128 for high-concurrency scenarios.
func (*StripedLocks) Lock ΒΆ
func (sl *StripedLocks) Lock(key string) func()
Lock acquires an exclusive lock for the given key. Returns an unlock function that MUST be called to release the lock.
Example:
unlock := locks.Lock(key) defer unlock() // ... critical section
func (*StripedLocks) RLock ΒΆ
func (sl *StripedLocks) RLock(key string) func()
RLock acquires a shared read lock for the given key. Multiple readers can hold the lock simultaneously.
Example:
unlock := locks.RLock(key) defer unlock() // ... read operation
type Transaction ΒΆ
type Transaction = OptimisticTransaction
Transaction is deprecated. Use OptimisticTransaction instead. Kept for backward compatibility.
type UniqueConstraint ΒΆ added in v2.0.3
type UniqueConstraint struct {
EntityType string // e.g., "users", "admin_users"
FieldName string // e.g., "email", "platform_user_id"
GetValue func(data interface{}) (string, error) // Extract value from data
Normalize func(value string) string // Optional: normalize before storing (e.g., lowercase email)
}
UniqueConstraint defines a field that must be unique across all entities of a type.
Example: Email uniqueness for users
constraint := &UniqueConstraint{
EntityType: "users",
FieldName: "email",
GetValue: func(data interface{}) (string, error) { return data.(*User).Email, nil },
}
type ZapLogger ΒΆ
type ZapLogger struct {
// contains filtered or unexported fields
}
ZapLogger adapts go.uber.org/zap logger to the Smarterbase Logger interface
func NewDevelopmentZapLogger ΒΆ
NewDevelopmentZapLogger creates a development Zap logger This is optimized for human-readable console output
func NewProductionZapLogger ΒΆ
NewProductionZapLogger creates a production-ready Zap logger This is a convenience function for common use cases
func NewZapLogger ΒΆ
NewZapLogger creates a new Zap logger adapter
func NewZapLoggerFromSugar ΒΆ
func NewZapLoggerFromSugar(logger *zap.SugaredLogger) *ZapLogger
NewZapLoggerFromSugar creates a logger from an existing sugared logger
Source Files
ΒΆ
- auto_indexing.go
- backend.go
- batch.go
- cascades.go
- circuit_breaker.go
- config.go
- counter.go
- distributed_lock.go
- doc.go
- encryption.go
- errors.go
- filesystem_backend.go
- gcs_backend.go
- helpers.go
- id.go
- index_health.go
- index_manager.go
- index_repair.go
- lock_manager.go
- logger.go
- metrics.go
- metrics_exporter.go
- migration.go
- minio_backend.go
- prometheus_metrics.go
- query.go
- query_profiler.go
- redis_config.go
- redis_constraints.go
- redis_indexer.go
- s3_backend.go
- s3_backend_with_lock.go
- store.go
- striped_lock.go
- transaction.go
- zap_logger.go
Directories
ΒΆ
| Path | Synopsis |
|---|---|
|
examples
|
|
|
ecommerce-orders
command
|
|
|
event-logging
command
|
|
|
metrics-dashboard
command
|
|
|
multi-tenant-config
command
|
|
|
production-patterns
command
|
|
|
schema-migrations
command
|
|
|
simple/01-quickstart
command
Quickstart: Track your coffee consumption with automatic indexing.
|
Quickstart: Track your coffee consumption with automatic indexing. |
|
simple/02-simple-crud
command
Simple CRUD demonstrates Create, Read, Update, Delete operations with the Simple API.
|
Simple CRUD demonstrates Create, Read, Update, Delete operations with the Simple API. |
|
simple/03-with-indexing
command
With Indexing demonstrates querying by indexed fields using Redis.
|
With Indexing demonstrates querying by indexed fields using Redis. |
|
simple/04-versioning
command
|
|
|
user-management
command
|
|
|
Package simple provides a high-level, batteries-included API for SmarterBase.
|
Package simple provides a high-level, batteries-included API for SmarterBase. |