首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >第十一章:多向量库管理系统

第十一章:多向量库管理系统

作者头像
javpower
发布2025-08-06 17:34:37
发布2025-08-06 17:34:37
1680
举报

第十一章:多向量库管理系统

11.1 向量库管理系统设计

11.1.1 系统架构概述

多向量库管理系统是企业级向量搜索解决方案的核心组件,需要支持多租户、多数据源的向量数据管理。

11.1.2 核心组件设计

代码语言:javascript
复制
package com.jvector.database;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CompletableFuture;

/**
 * 向量数据库管理器
 */
public class VectorDatabaseManager {
    private static final Logger logger = LoggerFactory.getLogger(VectorDatabaseManager.class);

    private final Map<String, VectorDatabase> databases;
    private final DatabaseRegistry registry;
    private final ResourceManager resourceManager;
    private final SecurityManager securityManager;
    private final ConfigurationManager configManager;

    public VectorDatabaseManager(VectorDatabaseConfig config) {
        this.databases = new ConcurrentHashMap<>();
        this.registry = new DatabaseRegistry();
        this.resourceManager = new ResourceManager(config.getResourceConfig());
        this.securityManager = new SecurityManager(config.getSecurityConfig());
        this.configManager = new ConfigurationManager(config);

        initialize();
    }

    /**
     * 初始化管理器
     */
    private void initialize() {
        logger.info("Initializing VectorDatabaseManager...");

        // 加载现有数据库
        loadExistingDatabases();

        // 启动后台任务
        startBackgroundTasks();

        logger.info("VectorDatabaseManager initialized with {} databases", databases.size());
    }

    /**
     * 创建新的向量数据库
     */
    public CompletableFuture<VectorDatabase> createDatabase(CreateDatabaseRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 验证权限
            securityManager.checkPermission(request.getUser(), Permission.CREATE_DATABASE);

            // 验证数据库名称
            validateDatabaseName(request.getName());

            // 分配资源
            ResourceAllocation allocation = resourceManager.allocateResources(request.getResourceRequirements());

            try {
                // 创建数据库实例
                VectorDatabase database = createDatabaseInstance(request, allocation);

                // 注册数据库
                registry.register(database);
                databases.put(request.getName(), database);

                logger.info("Created database: {}", request.getName());
                return database;

            } catch (Exception e) {
                // 释放资源
                resourceManager.releaseResources(allocation);
                throw new DatabaseCreationException("Failed to create database: " + request.getName(), e);
            }
        });
    }

    /**
     * 获取数据库实例
     */
    public Optional<VectorDatabase> getDatabase(String name, String user) {
        VectorDatabase database = databases.get(name);

        if (database == null) {
            return Optional.empty();
        }

        // 检查访问权限
        if (!securityManager.hasAccess(user, database)) {
            throw new AccessDeniedException("User " + user + " does not have access to database " + name);
        }

        return Optional.of(database);
    }

    /**
     * 删除数据库
     */
    public CompletableFuture<Void> deleteDatabase(String name, String user) {
        return CompletableFuture.runAsync(() -> {
            // 验证权限
            securityManager.checkPermission(user, Permission.DELETE_DATABASE);

            VectorDatabase database = databases.get(name);
            if (database == null) {
                throw new DatabaseNotFoundException("Database not found: " + name);
            }

            try {
                // 停止数据库
                database.shutdown();

                // 删除数据文件
                database.cleanup();

                // 释放资源
                ResourceAllocation allocation = database.getResourceAllocation();
                resourceManager.releaseResources(allocation);

                // 从注册表中移除
                registry.unregister(name);
                databases.remove(name);

                logger.info("Deleted database: {}", name);

            } catch (Exception e) {
                throw new DatabaseDeletionException("Failed to delete database: " + name, e);
            }
        });
    }

    /**
     * 列出所有数据库
     */
    public List<DatabaseInfo> listDatabases(String user) {
        return databases.values().stream()
            .filter(db -> securityManager.hasAccess(user, db))
            .map(this::createDatabaseInfo)
            .collect(Collectors.toList());
    }

    /**
     * 创建数据库实例
     */
    private VectorDatabase createDatabaseInstance(CreateDatabaseRequest request, ResourceAllocation allocation) {
        VectorDatabaseConfig dbConfig = VectorDatabaseConfig.builder()
            .name(request.getName())
            .dimensions(request.getDimensions())
            .distanceMetric(request.getDistanceMetric())
            .indexConfig(request.getIndexConfig())
            .resourceAllocation(allocation)
            .build();

        return new VectorDatabase(dbConfig);
    }

    /**
     * 验证数据库名称
     */
    private void validateDatabaseName(String name) {
        if (name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException("Database name cannot be empty");
        }

        if (databases.containsKey(name)) {
            throw new DatabaseAlreadyExistsException("Database already exists: " + name);
        }

        if (!isValidDatabaseName(name)) {
            throw new IllegalArgumentException("Invalid database name: " + name);
        }
    }

    /**
     * 检查数据库名称是否有效
     */
    private boolean isValidDatabaseName(String name) {
        return name.matches("^[a-zA-Z][a-zA-Z0-9_-]*$") && name.length() <= 64;
    }
}

/**
 * 向量数据库实例
 */
public class VectorDatabase {
    private final String name;
    private final HnswIndex index;
    private final DatabaseConfig config;
    private final ResourceAllocation resourceAllocation;
    private final AtomicReference<DatabaseState> state;
    private final DatabaseMetrics metrics;

    public VectorDatabase(VectorDatabaseConfig config) {
        this.name = config.getName();
        this.config = config;
        this.resourceAllocation = config.getResourceAllocation();
        this.state = new AtomicReference<>(DatabaseState.INITIALIZING);
        this.metrics = new DatabaseMetrics(name);

        // 创建索引
        this.index = createIndex(config);

        // 启动数据库
        startup();
    }

    /**
     * 启动数据库
     */
    private void startup() {
        try {
            // 初始化索引
            index.initialize();

            // 加载数据
            loadData();

            // 启动监控
            startMonitoring();

            state.set(DatabaseState.RUNNING);
            logger.info("Database {} started successfully", name);

        } catch (Exception e) {
            state.set(DatabaseState.FAILED);
            throw new DatabaseStartupException("Failed to start database: " + name, e);
        }
    }

    /**
     * 添加向量
     */
    public CompletableFuture<Long> addVector(Vector vector) {
        checkDatabaseState();

        return CompletableFuture.supplyAsync(() -> {
            long startTime = System.nanoTime();

            try {
                long id = index.add(vector);

                // 记录指标
                long duration = System.nanoTime() - startTime;
                metrics.recordAddOperation(duration);

                return id;

            } catch (Exception e) {
                metrics.recordError("add_vector", e);
                throw new VectorOperationException("Failed to add vector", e);
            }
        });
    }

    /**
     * 搜索向量
     */
    public CompletableFuture<List<SearchResult>> search(Vector query, int k, SearchOptions options) {
        checkDatabaseState();

        return CompletableFuture.supplyAsync(() -> {
            long startTime = System.nanoTime();

            try {
                List<SearchResult> results = index.search(query, k, options);

                // 记录指标
                long duration = System.nanoTime() - startTime;
                metrics.recordSearchOperation(duration, results.size());

                return results;

            } catch (Exception e) {
                metrics.recordError("search", e);
                throw new VectorOperationException("Search failed", e);
            }
        });
    }

    /**
     * 检查数据库状态
     */
    private void checkDatabaseState() {
        DatabaseState currentState = state.get();
        if (currentState != DatabaseState.RUNNING) {
            throw new DatabaseNotReadyException("Database " + name + " is not ready, current state: " + currentState);
        }
    }

    /**
     * 获取数据库统计信息
     */
    public DatabaseStats getStats() {
        return DatabaseStats.builder()
            .name(name)
            .state(state.get())
            .vectorCount(index.size())
            .metrics(metrics.getSnapshot())
            .resourceUsage(getResourceUsage())
            .build();
    }

    /**
     * 关闭数据库
     */
    public void shutdown() {
        logger.info("Shutting down database: {}", name);

        state.set(DatabaseState.SHUTTING_DOWN);

        try {
            // 停止监控
            stopMonitoring();

            // 保存数据
            saveData();

            // 关闭索引
            index.close();

            state.set(DatabaseState.STOPPED);
            logger.info("Database {} shut down successfully", name);

        } catch (Exception e) {
            state.set(DatabaseState.FAILED);
            logger.error("Failed to shut down database: " + name, e);
        }
    }
}

11.2 多库操作API

11.2.1 统一API接口

代码语言:javascript
复制
package com.jvector.api;

/**
 * 多向量库操作API
 */
@RestController
@RequestMapping("/api/v1/databases")
public class VectorDatabaseController {

    private final VectorDatabaseManager databaseManager;
    private final RequestValidator validator;
    private final ResponseConverter converter;

    public VectorDatabaseController(VectorDatabaseManager databaseManager) {
        this.databaseManager = databaseManager;
        this.validator = new RequestValidator();
        this.converter = new ResponseConverter();
    }

    /**
     * 创建数据库
     */
    @PostMapping
    public ResponseEntity<DatabaseResponse> createDatabase(
            @RequestBody CreateDatabaseRequest request,
            @RequestHeader("X-User-ID") String userId) {

        validator.validate(request);

        return databaseManager.createDatabase(request.withUser(userId))
            .thenApply(db -> ResponseEntity.ok(converter.toResponse(db)))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(converter.toErrorResponse(ex)))
            .join();
    }

    /**
     * 列出数据库
     */
    @GetMapping
    public ResponseEntity<List<DatabaseInfo>> listDatabases(
            @RequestHeader("X-User-ID") String userId) {

        List<DatabaseInfo> databases = databaseManager.listDatabases(userId);
        return ResponseEntity.ok(databases);
    }

    /**
     * 获取数据库信息
     */
    @GetMapping("/{dbName}")
    public ResponseEntity<DatabaseInfo> getDatabaseInfo(
            @PathVariable String dbName,
            @RequestHeader("X-User-ID") String userId) {

        Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);

        if (database.isPresent()) {
            DatabaseInfo info = converter.toDatabaseInfo(database.get());
            return ResponseEntity.ok(info);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    /**
     * 删除数据库
     */
    @DeleteMapping("/{dbName}")
    public ResponseEntity<Void> deleteDatabase(
            @PathVariable String dbName,
            @RequestHeader("X-User-ID") String userId) {

        return databaseManager.deleteDatabase(dbName, userId)
            .thenApply(v -> ResponseEntity.ok().<Void>build())
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }

    /**
     * 添加向量
     */
    @PostMapping("/{dbName}/vectors")
    public ResponseEntity<VectorResponse> addVector(
            @PathVariable String dbName,
            @RequestBody AddVectorRequest request,
            @RequestHeader("X-User-ID") String userId) {

        validator.validate(request);

        Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);
        if (!database.isPresent()) {
            return ResponseEntity.notFound().build();
        }

        return database.get().addVector(request.getVector())
            .thenApply(id -> ResponseEntity.ok(new VectorResponse(id)))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }

    /**
     * 批量添加向量
     */
    @PostMapping("/{dbName}/vectors/batch")
    public ResponseEntity<BatchVectorResponse> addVectorsBatch(
            @PathVariable String dbName,
            @RequestBody BatchAddVectorRequest request,
            @RequestHeader("X-User-ID") String userId) {

        validator.validate(request);

        Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);
        if (!database.isPresent()) {
            return ResponseEntity.notFound().build();
        }

        List<CompletableFuture<Long>> futures = request.getVectors().stream()
            .map(database.get()::addVector)
            .collect(Collectors.toList());

        CompletableFuture<List<Long>> allFutures = 
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList()));

        return allFutures
            .thenApply(ids -> ResponseEntity.ok(new BatchVectorResponse(ids)))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }

    /**
     * 搜索向量
     */
    @PostMapping("/{dbName}/search")
    public ResponseEntity<SearchResponse> search(
            @PathVariable String dbName,
            @RequestBody SearchRequest request,
            @RequestHeader("X-User-ID") String userId) {

        validator.validate(request);

        Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);
        if (!database.isPresent()) {
            return ResponseEntity.notFound().build();
        }

        SearchOptions options = converter.toSearchOptions(request);

        return database.get().search(request.getQuery(), request.getK(), options)
            .thenApply(results -> ResponseEntity.ok(new SearchResponse(results)))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }

    /**
     * 获取数据库统计信息
     */
    @GetMapping("/{dbName}/stats")
    public ResponseEntity<DatabaseStats> getDatabaseStats(
            @PathVariable String dbName,
            @RequestHeader("X-User-ID") String userId) {

        Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);
        if (!database.isPresent()) {
            return ResponseEntity.notFound().build();
        }

        DatabaseStats stats = database.get().getStats();
        return ResponseEntity.ok(stats);
    }
}

/**
 * 跨数据库搜索API
 */
@RestController
@RequestMapping("/api/v1/cross-database")
public class CrossDatabaseController {

    private final CrossDatabaseSearchService searchService;

    public CrossDatabaseController(CrossDatabaseSearchService searchService) {
        this.searchService = searchService;
    }

    /**
     * 跨数据库搜索
     */
    @PostMapping("/search")
    public ResponseEntity<CrossSearchResponse> crossSearch(
            @RequestBody CrossSearchRequest request,
            @RequestHeader("X-User-ID") String userId) {

        return searchService.search(request, userId)
            .thenApply(results -> ResponseEntity.ok(results))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }

    /**
     * 批量跨数据库操作
     */
    @PostMapping("/batch")
    public ResponseEntity<BatchOperationResponse> batchOperation(
            @RequestBody BatchOperationRequest request,
            @RequestHeader("X-User-ID") String userId) {

        return searchService.batchOperation(request, userId)
            .thenApply(results -> ResponseEntity.ok(results))
            .exceptionally(ex -> ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build())
            .join();
    }
}

11.2.2 跨库查询实现

代码语言:javascript
复制
/**
 * 跨数据库搜索服务
 */
public class CrossDatabaseSearchService {
    private static final Logger logger = LoggerFactory.getLogger(CrossDatabaseSearchService.class);

    private final VectorDatabaseManager databaseManager;
    private final ExecutorService executorService;
    private final ResultMerger resultMerger;

    public CrossDatabaseSearchService(VectorDatabaseManager databaseManager) {
        this.databaseManager = databaseManager;
        this.executorService = ForkJoinPool.commonPool();
        this.resultMerger = new ResultMerger();
    }

    /**
     * 跨数据库搜索
     */
    public CompletableFuture<CrossSearchResponse> search(CrossSearchRequest request, String userId) {
        List<String> databaseNames = request.getDatabases();
        Vector query = request.getQuery();
        int k = request.getK();
        SearchOptions options = request.getOptions();

        // 并行搜索所有数据库
        List<CompletableFuture<DatabaseSearchResult>> searchFutures = databaseNames.stream()
            .map(dbName -> searchSingleDatabase(dbName, query, k, options, userId))
            .collect(Collectors.toList());

        // 等待所有搜索完成并合并结果
        return CompletableFuture.allOf(searchFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<DatabaseSearchResult> results = searchFutures.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());

                // 合并结果
                List<SearchResult> mergedResults = resultMerger.merge(results, k);

                return new CrossSearchResponse(mergedResults, results);
            });
    }

    /**
     * 搜索单个数据库
     */
    private CompletableFuture<DatabaseSearchResult> searchSingleDatabase(
            String dbName, Vector query, int k, SearchOptions options, String userId) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                Optional<VectorDatabase> database = databaseManager.getDatabase(dbName, userId);
                if (!database.isPresent()) {
                    logger.warn("Database not found or access denied: {}", dbName);
                    return null;
                }

                long startTime = System.nanoTime();
                List<SearchResult> results = database.get().search(query, k, options).join();
                long duration = System.nanoTime() - startTime;

                return new DatabaseSearchResult(dbName, results, duration);

            } catch (Exception e) {
                logger.error("Failed to search database: {}", dbName, e);
                return new DatabaseSearchResult(dbName, Collections.emptyList(), 0, e);
            }
        }, executorService);
    }

    /**
     * 批量操作
     */
    public CompletableFuture<BatchOperationResponse> batchOperation(
            BatchOperationRequest request, String userId) {

        List<CompletableFuture<OperationResult>> operationFutures = 
            request.getOperations().stream()
                .map(op -> executeSingleOperation(op, userId))
                .collect(Collectors.toList());

        return CompletableFuture.allOf(operationFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<OperationResult> results = operationFutures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());

                return new BatchOperationResponse(results);
            });
    }

    /**
     * 执行单个操作
     */
    private CompletableFuture<OperationResult> executeSingleOperation(
            DatabaseOperation operation, String userId) {

        return CompletableFuture.supplyAsync(() -> {
            try {
                Optional<VectorDatabase> database = 
                    databaseManager.getDatabase(operation.getDatabaseName(), userId);

                if (!database.isPresent()) {
                    return OperationResult.error("Database not found: " + operation.getDatabaseName());
                }

                return executeOperation(database.get(), operation);

            } catch (Exception e) {
                return OperationResult.error("Operation failed: " + e.getMessage());
            }
        }, executorService);
    }

    /**
     * 执行具体操作
     */
    private OperationResult executeOperation(VectorDatabase database, DatabaseOperation operation) {
        switch (operation.getType()) {
            case ADD_VECTOR:
                AddVectorOperation addOp = (AddVectorOperation) operation;
                Long id = database.addVector(addOp.getVector()).join();
                return OperationResult.success(id);

            case SEARCH:
                SearchOperation searchOp = (SearchOperation) operation;
                List<SearchResult> results = database.search(
                    searchOp.getQuery(), 
                    searchOp.getK(), 
                    searchOp.getOptions()
                ).join();
                return OperationResult.success(results);

            case DELETE_VECTOR:
                DeleteVectorOperation deleteOp = (DeleteVectorOperation) operation;
                boolean deleted = database.deleteVector(deleteOp.getId()).join();
                return OperationResult.success(deleted);

            default:
                return OperationResult.error("Unsupported operation type: " + operation.getType());
        }
    }
}

/**
 * 结果合并器
 */
public class ResultMerger {

    /**
     * 合并多个数据库的搜索结果
     */
    public List<SearchResult> merge(List<DatabaseSearchResult> results, int k) {
        // 收集所有结果
        List<SearchResult> allResults = results.stream()
            .flatMap(dbResult -> dbResult.getResults().stream())
            .collect(Collectors.toList());

        // 按距离排序并取前k个
        return allResults.stream()
            .sorted(Comparator.comparing(SearchResult::getDistance))
            .limit(k)
            .collect(Collectors.toList());
    }

    /**
     * 加权合并(考虑数据库性能差异)
     */
    public List<SearchResult> mergeWithWeights(List<DatabaseSearchResult> results, 
                                             Map<String, Double> databaseWeights, int k) {

        List<WeightedSearchResult> weightedResults = new ArrayList<>();

        for (DatabaseSearchResult dbResult : results) {
            String dbName = dbResult.getDatabaseName();
            double weight = databaseWeights.getOrDefault(dbName, 1.0);

            for (SearchResult result : dbResult.getResults()) {
                // 应用权重调整距离
                float adjustedDistance = (float) (result.getDistance() / weight);
                WeightedSearchResult weighted = new WeightedSearchResult(
                    result, adjustedDistance, dbName);
                weightedResults.add(weighted);
            }
        }

        // 按调整后的距离排序
        return weightedResults.stream()
            .sorted(Comparator.comparing(WeightedSearchResult::getAdjustedDistance))
            .limit(k)
            .map(WeightedSearchResult::getOriginalResult)
            .collect(Collectors.toList());
    }
}

11.3 数据隔离与安全机制

11.3.1 多租户数据隔离

代码语言:javascript
复制
/**
 * 多租户管理器
 */
public class MultiTenantManager {
    private static final Logger logger = LoggerFactory.getLogger(MultiTenantManager.class);

    private final Map<String, Tenant> tenants;
    private final TenantIsolationStrategy isolationStrategy;
    private final ResourceQuotaManager quotaManager;

    public MultiTenantManager(MultiTenantConfig config) {
        this.tenants = new ConcurrentHashMap<>();
        this.isolationStrategy = createIsolationStrategy(config.getIsolationLevel());
        this.quotaManager = new ResourceQuotaManager();
    }

    /**
     * 创建租户
     */
    public Tenant createTenant(CreateTenantRequest request) {
        validateTenantRequest(request);

        Tenant tenant = Tenant.builder()
            .id(request.getTenantId())
            .name(request.getName())
            .isolationLevel(request.getIsolationLevel())
            .resourceQuota(request.getResourceQuota())
            .securityPolicy(request.getSecurityPolicy())
            .build();

        // 设置隔离环境
        isolationStrategy.setupIsolation(tenant);

        // 设置资源配额
        quotaManager.setQuota(tenant.getId(), tenant.getResourceQuota());

        tenants.put(tenant.getId(), tenant);

        logger.info("Created tenant: {}", tenant.getId());
        return tenant;
    }

    /**
     * 获取租户的数据库访问器
     */
    public TenantDatabaseAccessor getDatabaseAccessor(String tenantId) {
        Tenant tenant = tenants.get(tenantId);
        if (tenant == null) {
            throw new TenantNotFoundException("Tenant not found: " + tenantId);
        }

        return new TenantDatabaseAccessor(tenant, isolationStrategy);
    }

    /**
     * 创建隔离策略
     */
    private TenantIsolationStrategy createIsolationStrategy(IsolationLevel level) {
        switch (level) {
            case NAMESPACE:
                return new NamespaceIsolationStrategy();
            case DATABASE:
                return new DatabaseIsolationStrategy();
            case PHYSICAL:
                return new PhysicalIsolationStrategy();
            default:
                throw new IllegalArgumentException("Unsupported isolation level: " + level);
        }
    }
}

/**
 * 租户数据库访问器
 */
public class TenantDatabaseAccessor {
    private final Tenant tenant;
    private final TenantIsolationStrategy isolationStrategy;
    private final Map<String, VectorDatabase> isolatedDatabases;

    public TenantDatabaseAccessor(Tenant tenant, TenantIsolationStrategy isolationStrategy) {
        this.tenant = tenant;
        this.isolationStrategy = isolationStrategy;
        this.isolatedDatabases = new ConcurrentHashMap<>();
    }

    /**
     * 获取租户的数据库
     */
    public VectorDatabase getDatabase(String databaseName) {
        String isolatedName = isolationStrategy.getIsolatedDatabaseName(tenant.getId(), databaseName);

        return isolatedDatabases.computeIfAbsent(isolatedName, name -> {
            return isolationStrategy.createIsolatedDatabase(tenant, databaseName);
        });
    }

    /**
     * 创建租户数据库
     */
    public VectorDatabase createDatabase(CreateDatabaseRequest request) {
        // 检查资源配额
        quotaManager.checkQuota(tenant.getId(), request.getResourceRequirements());

        // 创建隔离的数据库
        VectorDatabase database = isolationStrategy.createIsolatedDatabase(tenant, request.getName());

        String isolatedName = isolationStrategy.getIsolatedDatabaseName(tenant.getId(), request.getName());
        isolatedDatabases.put(isolatedName, database);

        return database;
    }
}

/**
 * 命名空间隔离策略
 */
public class NamespaceIsolationStrategy implements TenantIsolationStrategy {

    @Override
    public void setupIsolation(Tenant tenant) {
        // 创建租户命名空间
        String namespace = createNamespace(tenant.getId());
        tenant.setNamespace(namespace);
    }

    @Override
    public String getIsolatedDatabaseName(String tenantId, String databaseName) {
        return tenantId + "_" + databaseName;
    }

    @Override
    public VectorDatabase createIsolatedDatabase(Tenant tenant, String databaseName) {
        String isolatedName = getIsolatedDatabaseName(tenant.getId(), databaseName);

        VectorDatabaseConfig config = VectorDatabaseConfig.builder()
            .name(isolatedName)
            .namespace(tenant.getNamespace())
            .securityPolicy(tenant.getSecurityPolicy())
            .resourceLimits(tenant.getResourceQuota())
            .build();

        return new VectorDatabase(config);
    }

    private String createNamespace(String tenantId) {
        return "tenant_" + tenantId + "_" + System.currentTimeMillis();
    }
}

/**
 * 数据库级别隔离策略
 */
public class DatabaseIsolationStrategy implements TenantIsolationStrategy {

    @Override
    public void setupIsolation(Tenant tenant) {
        // 为租户创建独立的数据库实例池
        DatabaseInstancePool pool = new DatabaseInstancePool(tenant.getResourceQuota());
        tenant.setDatabasePool(pool);
    }

    @Override
    public String getIsolatedDatabaseName(String tenantId, String databaseName) {
        return databaseName; // 数据库名称不变,但运行在隔离的实例中
    }

    @Override
    public VectorDatabase createIsolatedDatabase(Tenant tenant, String databaseName) {
        DatabaseInstancePool pool = tenant.getDatabasePool();

        VectorDatabaseConfig config = VectorDatabaseConfig.builder()
            .name(databaseName)
            .instancePool(pool)
            .securityPolicy(tenant.getSecurityPolicy())
            .build();

        return pool.createDatabase(config);
    }
}

11.3.2 权限控制系统

代码语言:javascript
复制
/**
 * 权限管理系统
 */
public class PermissionManager {
    private final Map<String, User> users;
    private final Map<String, Role> roles;
    private final Map<String, List<Permission>> userPermissions;
    private final PermissionEvaluator evaluator;

    public PermissionManager() {
        this.users = new ConcurrentHashMap<>();
        this.roles = new ConcurrentHashMap<>();
        this.userPermissions = new ConcurrentHashMap<>();
        this.evaluator = new PermissionEvaluator();
    }

    /**
     * 创建用户
     */
    public User createUser(CreateUserRequest request) {
        User user = User.builder()
            .id(request.getUserId())
            .name(request.getName())
            .email(request.getEmail())
            .tenantId(request.getTenantId())
            .roles(request.getRoles())
            .build();

        users.put(user.getId(), user);

        // 计算用户权限
        updateUserPermissions(user);

        return user;
    }

    /**
     * 检查权限
     */
    public boolean hasPermission(String userId, String resource, String action) {
        List<Permission> permissions = userPermissions.get(userId);
        if (permissions == null) {
            return false;
        }

        return evaluator.evaluate(permissions, resource, action);
    }

    /**
     * 检查数据库访问权限
     */
    public boolean canAccessDatabase(String userId, String databaseName) {
        User user = users.get(userId);
        if (user == null) {
            return false;
        }

        // 检查租户隔离
        if (!isDatabaseInUserTenant(user, databaseName)) {
            return false;
        }

        // 检查具体权限
        return hasPermission(userId, "database:" + databaseName, "read");
    }

    /**
     * 更新用户权限
     */
    private void updateUserPermissions(User user) {
        List<Permission> allPermissions = new ArrayList<>();

        // 收集角色权限
        for (String roleId : user.getRoles()) {
            Role role = roles.get(roleId);
            if (role != null) {
                allPermissions.addAll(role.getPermissions());
            }
        }

        // 去重并缓存
        List<Permission> uniquePermissions = allPermissions.stream()
            .distinct()
            .collect(Collectors.toList());

        userPermissions.put(user.getId(), uniquePermissions);
    }

    /**
     * 检查数据库是否在用户租户内
     */
    private boolean isDatabaseInUserTenant(User user, String databaseName) {
        // 简化实现:检查数据库名称前缀
        return databaseName.startsWith(user.getTenantId() + "_");
    }
}

/**
 * 权限评估器
 */
public class PermissionEvaluator {

    /**
     * 评估权限
     */
    public boolean evaluate(List<Permission> permissions, String resource, String action) {
        for (Permission permission : permissions) {
            if (matches(permission, resource, action)) {
                return true;
            }
        }
        return false;
    }

    /**
     * 检查权限是否匹配
     */
    private boolean matches(Permission permission, String resource, String action) {
        return matchesResource(permission.getResource(), resource) &&
               matchesAction(permission.getAction(), action);
    }

    /**
     * 检查资源是否匹配
     */
    private boolean matchesResource(String permissionResource, String requestedResource) {
        if (permissionResource.equals("*")) {
            return true; // 通配符匹配所有资源
        }

        if (permissionResource.endsWith("*")) {
            String prefix = permissionResource.substring(0, permissionResource.length() - 1);
            return requestedResource.startsWith(prefix);
        }

        return permissionResource.equals(requestedResource);
    }

    /**
     * 检查操作是否匹配
     */
    private boolean matchesAction(String permissionAction, String requestedAction) {
        if (permissionAction.equals("*")) {
            return true; // 通配符匹配所有操作
        }

        return permissionAction.equals(requestedAction);
    }
}

小结

本章详细介绍了多向量库管理系统的设计与实现:

  1. 系统架构
    • 分层的管理系统架构
    • 统一的数据库管理接口
    • 资源池化和调度机制
  2. 多库操作
    • RESTful API设计
    • 跨库查询优化
    • 批量操作支持
  3. 数据隔离
    • 多租户隔离策略
    • 命名空间隔离
    • 物理隔离机制
  4. 安全控制
    • 基于角色的权限控制
    • 细粒度权限评估
    • 租户数据隔离

这个管理系统为企业级向量搜索应用提供了完整的多租户、多数据库管理解决方案。


思考题:

  1. 如何设计一个支持动态扩缩容的向量数据库集群?
  2. 在跨库查询中如何处理不同数据库的性能差异?
  3. 如何实现向量数据的实时同步和一致性保证?
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coder建设 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第十一章:多向量库管理系统
    • 11.1 向量库管理系统设计
      • 11.1.1 系统架构概述
      • 11.1.2 核心组件设计
    • 11.2 多库操作API
      • 11.2.1 统一API接口
      • 11.2.2 跨库查询实现
    • 11.3 数据隔离与安全机制
      • 11.3.1 多租户数据隔离
      • 11.3.2 权限控制系统
    • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档