diff --git a/.env.example b/.env.example index e19346c4bf70..ce03715b5ef0 100644 --- a/.env.example +++ b/.env.example @@ -370,10 +370,30 @@ ZAPIER_NLA_API_KEY= #==================================================# SEARCH=true + +# Search Provider: 'meilisearch' (default), 'opensearch', or 'typesense' +# If not set, auto-detected from available env vars below. +# SEARCH_PROVIDER=meilisearch + +#------- MeiliSearch (default) -------# MEILI_NO_ANALYTICS=true MEILI_HOST=http://0.0.0.0:7700 MEILI_MASTER_KEY=DrhYf7zENyR6AlUCKmnz0eYASOQdl6zxH7s7MKFSfFCt +#------- OpenSearch -------# +# Uncomment and configure to use OpenSearch instead of MeiliSearch. +# Setting OPENSEARCH_HOST will auto-select OpenSearch as the provider. +# OPENSEARCH_HOST=https://localhost:9200 +# OPENSEARCH_USERNAME=admin +# OPENSEARCH_PASSWORD= +# OPENSEARCH_INSECURE=true + +#------- Typesense -------# +# Uncomment and configure to use Typesense instead of MeiliSearch. +# Setting TYPESENSE_HOST + TYPESENSE_API_KEY will auto-select Typesense as the provider. +# TYPESENSE_HOST=http://localhost:8108 +# TYPESENSE_API_KEY= + # Optional: Disable indexing, useful in a multi-node setup # where only one instance should perform an index sync. # MEILI_NO_SYNC=true diff --git a/api/db/indexSync.js b/api/db/indexSync.js index 8e8e999d9218..335993baf11d 100644 --- a/api/db/indexSync.js +++ b/api/db/indexSync.js @@ -1,6 +1,6 @@ const mongoose = require('mongoose'); const { MeiliSearch } = require('meilisearch'); -const { logger } = require('@librechat/data-schemas'); +const { logger, getSearchProvider, detectSearchProvider } = require('@librechat/data-schemas'); const { CacheKeys } = require('librechat-data-provider'); const { isEnabled, FlowStateManager } = require('@librechat/api'); const { getLogStores } = require('~/cache'); @@ -188,7 +188,126 @@ async function ensureFilterableAttributes(client) { } /** - * Performs the actual sync operations for messages and conversations + * Ensures indexes have proper filterable attributes for non-MeiliSearch providers. + * @param {import('./search/searchProvider').SearchProvider} provider - Search provider instance + * @returns {Promise<{settingsUpdated: boolean, orphanedDocsFound: boolean}>} + */ +async function ensureFilterableAttributesGeneric(provider) { + let settingsUpdated = false; + let hasOrphanedDocs = false; + + try { + for (const indexName of ['messages', 'convos']) { + try { + const settings = await provider.getIndexSettings(indexName); + const filterableAttrs = settings.filterableAttributes || []; + + if (!filterableAttrs.includes('user')) { + logger.info(`[indexSync] Configuring ${indexName} index to filter by user...`); + await provider.updateIndexSettings(indexName, { + filterableAttributes: ['user'], + }); + logger.info(`[indexSync] ${indexName} index configured for user filtering`); + settingsUpdated = true; + } + + // Check for orphaned documents + try { + const searchResult = await provider.search(indexName, '', { limit: 1 }); + if (searchResult.hits.length > 0 && !searchResult.hits[0].user) { + logger.info( + `[indexSync] Existing ${indexName} missing user field, will clean up orphaned documents...`, + ); + hasOrphanedDocs = true; + } + } catch (searchError) { + logger.debug(`[indexSync] Could not check ${indexName} documents:`, searchError.message); + } + } catch (error) { + logger.warn( + `[indexSync] Could not check/update ${indexName} index settings:`, + error.message, + ); + } + } + + if (hasOrphanedDocs) { + for (const indexName of ['messages', 'convos']) { + try { + await deleteDocumentsWithoutUserFieldGeneric(provider, indexName); + } catch (error) { + logger.debug(`[indexSync] Could not clean up ${indexName}:`, error.message); + } + } + logger.info('[indexSync] Orphaned documents cleaned up without forcing resync.'); + } + + if (settingsUpdated) { + logger.info('[indexSync] Index settings updated. Full re-sync will be triggered.'); + } + } catch (error) { + logger.error('[indexSync] Error ensuring filterable attributes:', error); + } + + return { settingsUpdated, orphanedDocsFound: hasOrphanedDocs }; +} + +/** + * Deletes documents without user field from a search index (generic provider version). + * @param {import('./search/searchProvider').SearchProvider} provider + * @param {string} indexName + * @returns {Promise} + */ +async function deleteDocumentsWithoutUserFieldGeneric(provider, indexName) { + let deletedCount = 0; + let offset = 0; + const batchSize = 1000; + const primaryKey = indexName === 'messages' ? 'messageId' : 'conversationId'; + + try { + while (true) { + const searchResult = await provider.search(indexName, '', { + limit: batchSize, + offset: offset, + }); + + if (searchResult.hits.length === 0) { + break; + } + + const idsToDelete = searchResult.hits + .filter((hit) => !hit.user) + .map((hit) => hit.id || hit[primaryKey]) + .filter(Boolean); + + if (idsToDelete.length > 0) { + logger.info( + `[indexSync] Deleting ${idsToDelete.length} documents without user field from ${indexName} index`, + ); + await provider.deleteDocuments(indexName, idsToDelete); + deletedCount += idsToDelete.length; + } + + if (searchResult.hits.length < batchSize) { + break; + } + + offset += batchSize; + } + + if (deletedCount > 0) { + logger.info(`[indexSync] Deleted ${deletedCount} orphaned documents from ${indexName} index`); + } + } catch (error) { + logger.error(`[indexSync] Error deleting documents from ${indexName}:`, error); + } + + return deletedCount; +} + +/** + * Performs the actual sync operations for messages and conversations. + * Supports both MeiliSearch (legacy) and generic search providers (OpenSearch, etc.). * @param {FlowStateManager} flowManager - Flow state manager instance * @param {string} flowId - Flow identifier * @param {string} flowType - Flow type @@ -200,16 +319,39 @@ async function performSync(flowManager, flowId, flowType) { return { messagesSync: false, convosSync: false }; } - const client = MeiliSearchClient.getInstance(); + const providerType = detectSearchProvider ? detectSearchProvider() : 'meilisearch'; + let settingsUpdated = false; + let _orphanedDocsFound = false; - const { status } = await client.health(); - if (status !== 'available') { - throw new Error('Meilisearch not available'); - } + if (providerType === 'meilisearch') { + // Legacy MeiliSearch path — fully backward compatible + const client = MeiliSearchClient.getInstance(); + + const { status } = await client.health(); + if (status !== 'available') { + throw new Error('Meilisearch not available'); + } + + /** Ensures indexes have proper filterable attributes configured */ + const result = await ensureFilterableAttributes(client); + settingsUpdated = result.settingsUpdated; + _orphanedDocsFound = result.orphanedDocsFound; + } else { + // Generic provider path (OpenSearch, etc.) + const provider = getSearchProvider ? getSearchProvider() : null; + if (!provider) { + throw new Error('Search provider not configured'); + } - /** Ensures indexes have proper filterable attributes configured */ - const { settingsUpdated, orphanedDocsFound: _orphanedDocsFound } = - await ensureFilterableAttributes(client); + const healthy = await provider.healthCheck(); + if (!healthy) { + throw new Error(`${providerType} not available`); + } + + const result = await ensureFilterableAttributesGeneric(provider); + settingsUpdated = result.settingsUpdated; + _orphanedDocsFound = result.orphanedDocsFound; + } let messagesSync = false; let convosSync = false; @@ -239,7 +381,7 @@ async function performSync(flowManager, flowId, flowType) { if (settingsUpdated || unindexedMessages > syncThreshold) { logger.info(`[indexSync] Starting message sync (${unindexedMessages} unindexed)`); - await Message.syncWithMeili(); + await (Message.syncWithSearch || Message.syncWithMeili).call(Message); messagesSync = true; } else if (unindexedMessages > 0) { logger.info( @@ -265,7 +407,7 @@ async function performSync(flowManager, flowId, flowType) { const unindexedConvos = convoCount - convosIndexed; if (settingsUpdated || unindexedConvos > syncThreshold) { logger.info(`[indexSync] Starting convos sync (${unindexedConvos} unindexed)`); - await Conversation.syncWithMeili(); + await (Conversation.syncWithSearch || Conversation.syncWithMeili).call(Conversation); convosSync = true; } else if (unindexedConvos > 0) { logger.info( @@ -315,8 +457,9 @@ async function indexSync() { }); // Use a unique flow ID for the sync operation - const flowId = 'meili-index-sync'; - const flowType = 'MEILI_SYNC'; + const providerType = detectSearchProvider ? detectSearchProvider() : 'meilisearch'; + const flowId = `search-index-sync-${providerType}`; + const flowType = 'SEARCH_SYNC'; try { // This will only execute the handler if no other instance is running the sync @@ -341,14 +484,17 @@ async function indexSync() { logger.debug('[indexSync] Creating indices...'); currentTimeout = setTimeout(async () => { try { - await Message.syncWithMeili(); - await Conversation.syncWithMeili(); + await (Message.syncWithSearch || Message.syncWithMeili).call(Message); + await (Conversation.syncWithSearch || Conversation.syncWithMeili).call(Conversation); } catch (err) { logger.error('[indexSync] Trouble creating indices, try restarting the server.', err); } }, 750); - } else if (err.message.includes('Meilisearch not configured')) { - logger.info('[indexSync] Meilisearch not configured, search will be disabled.'); + } else if ( + err.message.includes('Meilisearch not configured') || + err.message.includes('Search provider not configured') + ) { + logger.info('[indexSync] Search provider not configured, search will be disabled.'); } else { logger.error('[indexSync] error', err); } diff --git a/api/models/Conversation.js b/api/models/Conversation.js index 32eac1a76419..8611d52865c2 100644 --- a/api/models/Conversation.js +++ b/api/models/Conversation.js @@ -187,17 +187,37 @@ module.exports = { if (search) { try { - const meiliResults = await Conversation.meiliSearch(search, { filter: `user = "${user}"` }); - const matchingIds = Array.isArray(meiliResults.hits) - ? meiliResults.hits.map((result) => result.conversationId) + // Use Mongoose model's meiliSearch method which is aliased to searchIndex + // and works with all search providers (MeiliSearch, OpenSearch, Typesense) + const { Message } = require('~/db/models'); + + // Search both conversations (by title) and messages (by content) + const [convoResults, messageResults] = await Promise.all([ + Conversation.meiliSearch(search, { filter: `user = "${user}"` }), + Message.meiliSearch(search, { filter: `user = "${user}"` }), + ]); + + logger.info(`[getConvosByCursor] Search results - convo hits: ${convoResults?.hits?.length || 0}, message hits: ${messageResults?.hits?.length || 0}`); + + const convoIds = Array.isArray(convoResults?.hits) + ? convoResults.hits.map((result) => result.conversationId) + : []; + const messageConvoIds = Array.isArray(messageResults?.hits) + ? messageResults.hits.map((result) => result.conversationId) : []; + + // Combine and deduplicate conversation IDs from both searches + const matchingIds = [...new Set([...convoIds, ...messageConvoIds])]; + + logger.info(`[getConvosByCursor] Total matching conversation IDs: ${matchingIds.length}`); + if (!matchingIds.length) { return { conversations: [], nextCursor: null }; } filters.push({ conversationId: { $in: matchingIds } }); } catch (error) { - logger.error('[getConvosByCursor] Error during meiliSearch', error); - throw new Error('Error during meiliSearch'); + logger.error('[getConvosByCursor] Error during search', error); + throw new Error('Error during search'); } } diff --git a/api/server/routes/search.js b/api/server/routes/search.js index 2cd2fc3534ed..84c7f20feee8 100644 --- a/api/server/routes/search.js +++ b/api/server/routes/search.js @@ -3,6 +3,8 @@ const { MeiliSearch } = require('meilisearch'); const { isEnabled } = require('@librechat/api'); const requireJwtAuth = require('~/server/middleware/requireJwtAuth'); +const { getSearchProvider, detectSearchProvider } = require('@librechat/data-schemas'); + const router = express.Router(); router.use(requireJwtAuth); @@ -13,6 +15,19 @@ router.get('/enable', async function (req, res) { } try { + const providerType = detectSearchProvider(); + + if (providerType && providerType !== 'meilisearch') { + // Use generic search provider (OpenSearch, etc.) + const provider = getSearchProvider(); + if (!provider) { + return res.send(false); + } + const healthy = await provider.healthCheck(); + return res.send(healthy); + } + + // Default: MeiliSearch (backward compatible) const client = new MeiliSearch({ host: process.env.MEILI_HOST, apiKey: process.env.MEILI_MASTER_KEY, diff --git a/config/reset-meili-sync.js b/config/reset-meili-sync.js index 60f45246a878..9ba91d4a8c4e 100644 --- a/config/reset-meili-sync.js +++ b/config/reset-meili-sync.js @@ -10,14 +10,15 @@ const { batchResetMeiliFlags } = require('~/db/utils'); await connect(); console.purple('---------------------------------------'); - console.purple('Reset MeiliSearch Synchronization Flags'); + console.purple('Reset Search Synchronization Flags'); console.purple('---------------------------------------'); - console.yellow('\nThis script will reset the MeiliSearch indexing flags in MongoDB.'); - console.yellow('Use this when MeiliSearch data has been deleted or corrupted,'); - console.yellow('and you need to trigger a full re-synchronization.\n'); + console.yellow('\nThis script will reset the search indexing flags in MongoDB.'); + console.yellow('Use this when search index data has been deleted or corrupted,'); + console.yellow('and you need to trigger a full re-synchronization.'); + console.yellow('Works with MeiliSearch, OpenSearch, and Typesense.\n'); const confirm = await askQuestion( - 'Are you sure you want to reset all MeiliSearch sync flags? (y/N): ', + 'Are you sure you want to reset all search sync flags? (y/N): ', ); if (confirm.toLowerCase() !== 'y') { @@ -55,12 +56,12 @@ const { batchResetMeiliFlags } = require('~/db/utils'); .countDocuments(queryTotal); console.purple('\n---------------------------------------'); - console.green('MeiliSearch sync flags have been reset successfully!'); + console.green('Search sync flags have been reset successfully!'); console.cyan(`\nDocuments queued for sync:`); console.cyan(`Messages: ${totalMessages}`); console.cyan(`Conversations: ${totalConversations}`); console.yellow('\nThe next time LibreChat starts or performs a sync check,'); - console.yellow('all data will be re-indexed into MeiliSearch.'); + console.yellow('all data will be re-indexed into your configured search provider.'); console.purple('---------------------------------------\n'); // Ask if user wants to see advanced options @@ -81,7 +82,7 @@ const { batchResetMeiliFlags } = require('~/db/utils'); silentExit(0); } catch (error) { - console.red('\nError resetting MeiliSearch sync flags:'); + console.red('\nError resetting search sync flags:'); console.error(error); silentExit(1); } diff --git a/docker-compose.override.yml.example b/docker-compose.override.yml.example index 8c8aba9ed089..860af7157feb 100644 --- a/docker-compose.override.yml.example +++ b/docker-compose.override.yml.example @@ -102,6 +102,47 @@ # ports: # - 7700:7700 +# # USE OPENSEARCH INSTEAD OF MEILISEARCH +# # 1. Disable MeiliSearch above (add it to the "donotstart" profile) +# # 2. Set in .env: SEARCH=true, SEARCH_PROVIDER=opensearch, OPENSEARCH_HOST=http://opensearch-node1:9200, OPENSEARCH_INSECURE=true +# opensearch-node1: +# image: opensearchproject/opensearch:3.4.0 +# container_name: opensearch-node1 +# environment: +# - cluster.name=opensearch-cluster +# - node.name=opensearch-node1 +# - discovery.type=single-node +# - bootstrap.memory_lock=true +# - DISABLE_SECURITY_PLUGIN=true +# - DISABLE_INSTALL_DEMO_CONFIG=true +# - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" +# ulimits: +# memlock: +# soft: -1 +# hard: -1 +# nofile: +# soft: 65536 +# hard: 65536 +# volumes: +# - opensearch-data:/usr/share/opensearch/data +# ports: +# - 9200:9200 + +# # USE TYPESENSE INSTEAD OF MEILISEARCH +# # 1. Disable MeiliSearch above (add it to the "donotstart" profile) +# # 2. Set in .env: SEARCH=true, SEARCH_PROVIDER=typesense, TYPESENSE_HOST=http://typesense:8108, TYPESENSE_API_KEY=xyz +# typesense: +# image: typesense/typesense:30.1 +# container_name: typesense +# environment: +# - TYPESENSE_DATA_DIR=/data +# - TYPESENSE_API_KEY=xyz +# - TYPESENSE_ENABLE_CORS=true +# volumes: +# - typesense-data:/data +# ports: +# - 8108:8108 + # # USE RAG API IMAGE WITH LOCAL EMBEDDINGS SUPPORT # rag_api: # image: ghcr.io/danny-avila/librechat-rag-api-dev:latest @@ -179,3 +220,8 @@ # - POSTGRES_DB=postgres # volumes: # - ./postgres:/var/lib/postgresql/data + +# # VOLUMES FOR ALTERNATIVE SEARCH BACKENDS (uncomment if using OpenSearch or Typesense above) +# volumes: +# opensearch-data: +# typesense-data: diff --git a/docker-compose.yml b/docker-compose.yml index bd39de343ee9..123096057093 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,6 +46,52 @@ services: - MEILI_MASTER_KEY=${MEILI_MASTER_KEY} volumes: - ./meili_data_v1.35.1:/meili_data + + # ---- OpenSearch (alternative to MeiliSearch) ---- + # To use OpenSearch instead of MeiliSearch, create a docker-compose.override.yml with: + # 1. The 'opensearch' service definition (copy from below and uncomment) + # 2. Override the 'api' service environment to remove MEILI_HOST + # 3. In .env, set: + # SEARCH_PROVIDER=opensearch + # OPENSEARCH_HOST=http://opensearch:9200 + # OPENSEARCH_INSECURE=true + # See docker-compose.override.yaml.example for a working example. + # opensearch: + # container_name: chat-opensearch + # image: opensearchproject/opensearch:3.4.0 + # restart: always + # environment: + # - discovery.type=single-node + # - DISABLE_SECURITY_PLUGIN=true + # - DISABLE_INSTALL_DEMO_CONFIG=true + # - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m + # volumes: + # - opensearch_data:/usr/share/opensearch/data + # ports: + # - "9200:9200" + + # ---- Typesense (alternative to MeiliSearch) ---- + # To use Typesense instead of MeiliSearch, create a docker-compose.override.yml with: + # 1. The 'typesense' service definition (copy from below and uncomment) + # 2. Override the 'api' service environment to remove MEILI_HOST + # 3. In .env, set: + # SEARCH_PROVIDER=typesense + # TYPESENSE_HOST=http://typesense:8108 + # TYPESENSE_API_KEY=xyz (match the --api-key below) + # See docker-compose.override.yaml.example for a working example. + # typesense: + # container_name: chat-typesense + # image: typesense/typesense:30.1 + # restart: always + # environment: + # - TYPESENSE_DATA_DIR=/data + # - TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-xyz} + # volumes: + # - typesense_data:/data + # ports: + # - "8108:8108" + # command: "--data-dir /data --api-key=${TYPESENSE_API_KEY:-xyz} --enable-cors" + vectordb: container_name: vectordb image: pgvector/pgvector:0.8.0-pg15-trixie @@ -70,3 +116,7 @@ services: volumes: pgdata2: + # Uncomment when using OpenSearch: + # opensearch_data: + # Uncomment when using Typesense: + # typesense_data: diff --git a/helm/librechat/Chart.yaml b/helm/librechat/Chart.yaml index 262595634477..2cc70d0f2e49 100755 --- a/helm/librechat/Chart.yaml +++ b/helm/librechat/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.9.8 +version: 1.10.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to @@ -36,6 +36,10 @@ dependencies: version: "0.11.0" condition: meilisearch.enabled repository: "https://meilisearch.github.io/meilisearch-kubernetes" + - name: opensearch + version: "3.4.0" + condition: opensearch.enabled + repository: "https://opensearch-project.github.io/helm-charts" - name: redis version: "24.1.3" condition: redis.enabled diff --git a/helm/librechat/templates/NOTES.txt b/helm/librechat/templates/NOTES.txt index cf2232b58a8e..2a0d8056bebc 100755 --- a/helm/librechat/templates/NOTES.txt +++ b/helm/librechat/templates/NOTES.txt @@ -20,3 +20,15 @@ echo "Visit http://127.0.0.1:8080 to use your application" kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT {{- end }} + +2. Search Backend: +{{- if .Values.meilisearch.enabled }} + Search is powered by MeiliSearch (default). +{{- else if .Values.opensearch.enabled }} + Search is powered by OpenSearch. + OpenSearch supports multi-replica mode and is Apache-2.0 licensed. +{{- else if and (hasKey .Values "typesense") .Values.typesense.enabled }} + Search is powered by Typesense. +{{- else }} + No search backend is enabled. Conversation search will be disabled. +{{- end }} diff --git a/helm/librechat/templates/_checks.yaml b/helm/librechat/templates/_checks.yaml index fb982c234883..670e803ba7da 100755 --- a/helm/librechat/templates/_checks.yaml +++ b/helm/librechat/templates/_checks.yaml @@ -1,4 +1,12 @@ {{ if hasKey .Values.env }} {{ fail "The Value env has been renamed and moved to librechat.configEnv. Refer to https://www.librechat.ai/docs/local/helm_chart#migrate for more information" }} -{{ end }} \ No newline at end of file +{{ end }} + +{{- $searchBackendCount := 0 }} +{{- if .Values.meilisearch.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if .Values.opensearch.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if and (hasKey .Values "typesense") .Values.typesense.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if gt (int $searchBackendCount) 1 }} + {{ fail "Multiple search backends are enabled. Please enable only one of: meilisearch, opensearch, or typesense." }} +{{- end }} \ No newline at end of file diff --git a/helm/librechat/templates/checks.yaml b/helm/librechat/templates/checks.yaml new file mode 100644 index 000000000000..b1ebe3b6a3ca --- /dev/null +++ b/helm/librechat/templates/checks.yaml @@ -0,0 +1,13 @@ +{{- /* Validation checks — rendered by Helm to catch configuration errors early */ -}} + +{{ if hasKey .Values "env" }} + {{ fail "The Value env has been renamed and moved to librechat.configEnv. Refer to https://www.librechat.ai/docs/local/helm_chart#migrate for more information" }} +{{ end }} + +{{- $searchBackendCount := 0 }} +{{- if .Values.meilisearch.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if .Values.opensearch.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if and (hasKey .Values "typesense") .Values.typesense.enabled }}{{ $searchBackendCount = add $searchBackendCount 1 }}{{- end }} +{{- if gt (int $searchBackendCount) 1 }} + {{ fail "Multiple search backends are enabled. Please enable only one of: meilisearch, opensearch, or typesense." }} +{{- end }} diff --git a/helm/librechat/templates/configmap-env.yaml b/helm/librechat/templates/configmap-env.yaml index ed5ac822dac9..3f115634e220 100755 --- a/helm/librechat/templates/configmap-env.yaml +++ b/helm/librechat/templates/configmap-env.yaml @@ -9,6 +9,52 @@ data: {{- if and (not (dig "configEnv" "MEILI_HOST" "" .Values.librechat)) .Values.meilisearch.enabled }} MEILI_HOST: http://{{ include "meilisearch.fullname" .Subcharts.meilisearch }}.{{ .Release.Namespace | lower }}.svc.cluster.local:7700 {{- end }} + {{- if .Values.opensearch.enabled }} + {{- if not (dig "configEnv" "SEARCH_PROVIDER" "" .Values.librechat) }} + SEARCH_PROVIDER: opensearch + {{- end }} + {{- if not (dig "configEnv" "OPENSEARCH_HOST" "" .Values.librechat) }} + {{- $opensearchProto := "http" }} + {{- $securityDisabled := false }} + {{- range (.Values.opensearch.extraEnvs | default list) }} + {{- if and (eq .name "DISABLE_SECURITY_PLUGIN") (eq .value "true") }} + {{- $securityDisabled = true }} + {{- end }} + {{- end }} + {{- if not $securityDisabled }} + {{- $opensearchProto = "https" }} + {{- end }} + {{- $osClusterName := .Values.opensearch.clusterName | default "opensearch-cluster" }} + {{- $osNodeGroup := .Values.opensearch.nodeGroup | default "master" }} + {{- $osMasterService := .Values.opensearch.masterService | default (printf "%s-%s" $osClusterName $osNodeGroup) }} + OPENSEARCH_HOST: "{{ $opensearchProto }}://{{ $osMasterService }}.{{ .Release.Namespace | lower }}.svc.cluster.local:9200" + {{- end }} + {{- if not (dig "configEnv" "OPENSEARCH_INSECURE" "" .Values.librechat) }} + {{- if $securityDisabled }} + OPENSEARCH_INSECURE: "true" + {{- else }} + OPENSEARCH_INSECURE: "false" + {{- end }} + {{- end }} + {{- if not (dig "configEnv" "OPENSEARCH_USERNAME" "" .Values.librechat) }} + {{- if and (hasKey .Values.opensearch "opensearchUsername") .Values.opensearch.opensearchUsername }} + OPENSEARCH_USERNAME: {{ .Values.opensearch.opensearchUsername | quote }} + {{- end }} + {{- end }} + {{- /* OPENSEARCH_PASSWORD should be provided via the existing secret (librechat-credentials-env) */ -}} + {{- /* or via global.librechat.env with secretKeyRef — never in a ConfigMap. */ -}} + {{- end }} + {{- /* Typesense auto-wiring: only when typesense is enabled via values */ -}} + {{- if and (hasKey .Values "typesense") .Values.typesense.enabled }} + {{- if not (dig "configEnv" "SEARCH_PROVIDER" "" .Values.librechat) }} + SEARCH_PROVIDER: typesense + {{- end }} + {{- if not (dig "configEnv" "TYPESENSE_HOST" "" .Values.librechat) }} + TYPESENSE_HOST: "http://{{ .Release.Name }}-typesense.{{ .Release.Namespace | lower }}.svc.cluster.local:8108" + {{- end }} + {{- /* TYPESENSE_API_KEY should be provided via the existing secret (librechat-credentials-env) */ -}} + {{- /* or via global.librechat.env with secretKeyRef — never in a ConfigMap. */ -}} + {{- end }} {{- if and (not (dig "configEnv" "MONGO_URI" "" .Values.librechat)) .Values.mongodb.enabled }} MONGO_URI: mongodb://{{ include "mongodb.service.nameOverride" .Subcharts.mongodb }}.{{ .Release.Namespace | lower }}.svc.cluster.local:27017/LibreChat {{- end }} diff --git a/helm/librechat/values.yaml b/helm/librechat/values.yaml index a4c877d64df8..d5bb14f09e3e 100755 --- a/helm/librechat/values.yaml +++ b/helm/librechat/values.yaml @@ -14,7 +14,9 @@ global: # - CREDS_IV # - JWT_SECRET # - JWT_REFRESH_SECRET - # - MEILI_MASTER_KEY + # - MEILI_MASTER_KEY (if using MeiliSearch) + # - OPENSEARCH_PASSWORD (if using OpenSearch with security enabled) + # - TYPESENSE_API_KEY (if using Typesense) librechat: existingSecretName: "librechat-credentials-env" # Used for Setting the Right Key, can be something like AZURE_API_KEY, if Azure OpenAI is used @@ -310,6 +312,128 @@ meilisearch: # Use an existing Kubernetes secret for the MEILI_MASTER_KEY existingMasterKeySecret: "librechat-credentials-env" +# OpenSearch Parameters +# Enable this as an alternative to MeiliSearch for conversation/message search. +# When enabled, set meilisearch.enabled=false and configure SEARCH_PROVIDER=opensearch in configEnv. +# OpenSearch supports multi-replica mode and is Apache-2.0 licensed. +# +# ---- Connecting to an external/remote OpenSearch cluster ---- +# If you already have an OpenSearch cluster running outside this chart, keep opensearch.enabled=false +# and set the connection details directly in librechat.configEnv: +# librechat: +# configEnv: +# SEARCH_PROVIDER: opensearch +# OPENSEARCH_HOST: "https://my-remote-opensearch:9200" +# OPENSEARCH_USERNAME: "admin" +# OPENSEARCH_INSECURE: "true" # set to "false" if using valid TLS certs +# Then provide OPENSEARCH_PASSWORD via the existing secret or secretKeyRef (see below). +opensearch: + enabled: false + # singleNode: true for development, false for production (multi-replica) + singleNode: true + replicas: 1 + + # ---- Image Configuration ---- + # Supports both official and Docker Hardened Images (DHI). + # To switch to DHI, set: + # global.dockerRegistry: "dhi.io" + # image.repository: "opensearch" + # image.tag: "3.4.0-debian13" + # persistence.image: "busybox" + # persistence.imageTag: "1.37.0-alpine3.22" + # imagePullSecrets: [{ name: "dhi-pull-secret" }] + global: + dockerRegistry: "" + image: + repository: "opensearchproject/opensearch" + tag: "3.4.0" + pullPolicy: "IfNotPresent" + + # OpenSearch Java memory settings + opensearchJavaOpts: "-Xmx512M -Xms512M" + + # Persistence + persistence: + enabled: true + size: 8Gi + # storageClass: "" + + # Security — for development, you can disable the security plugin. + # For production, configure proper TLS and authentication. + extraEnvs: + - name: DISABLE_SECURITY_PLUGIN + value: "true" + - name: DISABLE_INSTALL_DEMO_CONFIG + value: "true" + # # For production with security enabled, set the admin password: + # extraEnvs: + # - name: OPENSEARCH_INITIAL_ADMIN_PASSWORD + # valueFrom: + # secretKeyRef: + # name: opensearch-credentials + # key: admin-password + + # Resources + resources: + requests: + cpu: "500m" + memory: "512Mi" + + # Credentials for LibreChat to connect to OpenSearch. + # Username is auto-wired into the configmap (non-sensitive). + # opensearchUsername: "admin" + # + # OPENSEARCH_PASSWORD must be provided via the existing secret or secretKeyRef: + # Option A: Add OPENSEARCH_PASSWORD to the secret referenced by global.librechat.existingSecretName + # Option B: Use global.librechat.env: + # global: + # librechat: + # env: + # - name: OPENSEARCH_PASSWORD + # valueFrom: + # secretKeyRef: + # name: opensearch-credentials + # key: password + + # Pull secrets (required for DHI images, not needed for official images) + imagePullSecrets: [] + # - name: dhi-pull-secret + +# Typesense Parameters +# Enable this as an alternative to MeiliSearch for conversation/message search. +# When enabled, set meilisearch.enabled=false. +# Typesense is a fast, typo-tolerant search engine with Raft-based clustering. +# NOTE: Typesense is not a subchart dependency — deploy it separately +# (e.g. via the Typesense Kubernetes Operator or a community Helm chart). +# The values below configure how LibreChat connects to an existing Typesense instance. +# +# ---- Connecting to an external/remote Typesense instance ---- +# You can also keep typesense.enabled=false and set the connection details directly: +# librechat: +# configEnv: +# SEARCH_PROVIDER: typesense +# TYPESENSE_HOST: "http://my-remote-typesense:8108" +# Then provide TYPESENSE_API_KEY via the existing secret or secretKeyRef (see below). +typesense: + enabled: false + + # Connection settings (auto-wired into the configmap) + # host: "http://typesense:8108" + + # Image reference (for documentation; deploy Typesense separately) + # image: typesense/typesense:30.1 + + # TYPESENSE_API_KEY must be provided via the existing secret or secretKeyRef: + # Option A: Add TYPESENSE_API_KEY to the secret referenced by global.librechat.existingSecretName + # Option B: Use global.librechat.env: + # global: + # librechat: + # env: + # - name: TYPESENSE_API_KEY + # valueFrom: + # secretKeyRef: + # name: typesense-credentials + # key: api-key # Redis Parameters redis: enabled: false diff --git a/packages/data-schemas/src/index.ts b/packages/data-schemas/src/index.ts index a9c9a560787e..a05a73b8088d 100644 --- a/packages/data-schemas/src/index.ts +++ b/packages/data-schemas/src/index.ts @@ -9,3 +9,9 @@ export type * from './types'; export type * from './methods'; export { default as logger } from './config/winston'; export { default as meiliLogger } from './config/meiliLogger'; +export { + getSearchProvider, + resetSearchProvider, + detectSearchProvider, + isSearchEnabled, +} from './models/plugins/search'; diff --git a/packages/data-schemas/src/models/convo.ts b/packages/data-schemas/src/models/convo.ts index da0a8c68cf34..9ad05028f491 100644 --- a/packages/data-schemas/src/models/convo.ts +++ b/packages/data-schemas/src/models/convo.ts @@ -1,12 +1,24 @@ import type * as t from '~/types'; import mongoMeili from '~/models/plugins/mongoMeili'; +import mongoSearch from '~/models/plugins/mongoSearch'; +import { detectSearchProvider } from '~/models/plugins/search'; import convoSchema from '~/schema/convo'; /** - * Creates or returns the Conversation model using the provided mongoose instance and schema + * Creates or returns the Conversation model using the provided mongoose instance and schema. + * + * Supports multiple search backends via the search provider abstraction: + * - MeiliSearch (default, backward compatible with existing MEILI_HOST/MEILI_MASTER_KEY) + * - OpenSearch (via SEARCH_PROVIDER=opensearch or OPENSEARCH_HOST) + * - Typesense (via SEARCH_PROVIDER=typesense or TYPESENSE_HOST + TYPESENSE_API_KEY) + * + * For MeiliSearch, the original mongoMeili plugin is used to maintain full backward compatibility. + * For other providers, the new mongoSearch plugin is used. */ export function createConversationModel(mongoose: typeof import('mongoose')) { - if (process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) { + const provider = detectSearchProvider(); + + if (provider === 'meilisearch' && process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) { convoSchema.plugin(mongoMeili, { mongoose, host: process.env.MEILI_HOST, @@ -15,7 +27,14 @@ export function createConversationModel(mongoose: typeof import('mongoose')) { indexName: 'convos', primaryKey: 'conversationId', }); + } else if (provider && provider !== 'meilisearch') { + convoSchema.plugin(mongoSearch, { + mongoose, + indexName: 'convos', + primaryKey: 'conversationId', + }); } + return ( mongoose.models.Conversation || mongoose.model('Conversation', convoSchema) ); diff --git a/packages/data-schemas/src/models/message.ts b/packages/data-schemas/src/models/message.ts index 3a81211e6804..c5a19d3baea6 100644 --- a/packages/data-schemas/src/models/message.ts +++ b/packages/data-schemas/src/models/message.ts @@ -1,12 +1,21 @@ import type * as t from '~/types'; import mongoMeili from '~/models/plugins/mongoMeili'; +import mongoSearch from '~/models/plugins/mongoSearch'; +import { detectSearchProvider } from '~/models/plugins/search'; import messageSchema from '~/schema/message'; /** - * Creates or returns the Message model using the provided mongoose instance and schema + * Creates or returns the Message model using the provided mongoose instance and schema. + * + * Supports multiple search backends via the search provider abstraction: + * - MeiliSearch (default, backward compatible with existing MEILI_HOST/MEILI_MASTER_KEY) + * - OpenSearch (via SEARCH_PROVIDER=opensearch or OPENSEARCH_HOST) + * - Typesense (via SEARCH_PROVIDER=typesense or TYPESENSE_HOST + TYPESENSE_API_KEY) */ export function createMessageModel(mongoose: typeof import('mongoose')) { - if (process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) { + const provider = detectSearchProvider(); + + if (provider === 'meilisearch' && process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) { messageSchema.plugin(mongoMeili, { mongoose, host: process.env.MEILI_HOST, @@ -14,6 +23,12 @@ export function createMessageModel(mongoose: typeof import('mongoose')) { indexName: 'messages', primaryKey: 'messageId', }); + } else if (provider && provider !== 'meilisearch') { + messageSchema.plugin(mongoSearch, { + mongoose, + indexName: 'messages', + primaryKey: 'messageId', + }); } return mongoose.models.Message || mongoose.model('Message', messageSchema); diff --git a/packages/data-schemas/src/models/plugins/mongoSearch.ts b/packages/data-schemas/src/models/plugins/mongoSearch.ts new file mode 100644 index 000000000000..d065d83a128e --- /dev/null +++ b/packages/data-schemas/src/models/plugins/mongoSearch.ts @@ -0,0 +1,637 @@ +import _ from 'lodash'; +import { parseTextParts } from 'librechat-data-provider'; +import type { + CallbackWithoutResultAndOptionalError, + FilterQuery, + Document, + Schema, + Query, + Types, + Model, +} from 'mongoose'; +import type { IConversation, IMessage } from '~/types'; +import type { SearchProvider, SearchHit, SearchResult } from './search/searchProvider'; +import { getSearchProvider } from './search/searchProviderFactory'; +import logger from '~/config/meiliLogger'; + +interface MongoSearchOptions { + indexName: string; + primaryKey: string; + mongoose: typeof import('mongoose'); + syncBatchSize?: number; + syncDelayMs?: number; +} + +interface SearchIndexable { + [key: string]: unknown; + _meiliIndex?: boolean; +} + +interface SyncProgress { + lastSyncedId?: string; + totalProcessed: number; + totalDocuments: number; + isComplete: boolean; +} + +interface _DocumentWithSearchIndex extends Document { + _meiliIndex?: boolean; + preprocessObjectForIndex?: () => Record; + addObjectToSearchIndex?: (next: CallbackWithoutResultAndOptionalError) => Promise; + updateObjectInSearchIndex?: (next: CallbackWithoutResultAndOptionalError) => Promise; + deleteObjectFromSearchIndex?: (next: CallbackWithoutResultAndOptionalError) => Promise; + postSaveHook?: (next: CallbackWithoutResultAndOptionalError) => void; + postUpdateHook?: (next: CallbackWithoutResultAndOptionalError) => void; + postRemoveHook?: (next: CallbackWithoutResultAndOptionalError) => void; +} + +export type DocumentWithSearchIndex = _DocumentWithSearchIndex & IConversation & Partial; + +export interface SchemaWithSearchMethods extends Model { + syncWithSearch(): Promise; + getSyncProgress(): Promise; + processSyncBatch( + provider: SearchProvider, + indexName: string, + documents: Array>, + ): Promise; + cleanupSearchIndex( + provider: SearchProvider, + indexName: string, + primaryKey: string, + batchSize: number, + delayMs: number, + ): Promise; + setSearchIndexSettings(settings: Record): Promise; + searchIndex( + q: string, + params?: { filter?: string; limit?: number; offset?: number }, + populate?: boolean, + ): Promise; +} + +// Environment flags — kept backward compatible with existing MEILI_* vars +const searchEnabled = process.env.SEARCH != null && process.env.SEARCH.toLowerCase() === 'true'; + +const searchConfigured = (() => { + const provider = process.env.SEARCH_PROVIDER?.toLowerCase(); + if (provider === 'opensearch' || process.env.OPENSEARCH_HOST) { + return true; + } + if (provider === 'typesense' || (process.env.TYPESENSE_HOST && process.env.TYPESENSE_API_KEY)) { + return true; + } + return process.env.MEILI_HOST != null && process.env.MEILI_MASTER_KEY != null; +})(); + +const isEnabled = searchEnabled && searchConfigured; + +const getSyncConfig = () => ({ + batchSize: parseInt(process.env.MEILI_SYNC_BATCH_SIZE || '100', 10), + delayMs: parseInt(process.env.MEILI_SYNC_DELAY_MS || '100', 10), +}); + +const validateOptions = (options: Partial): void => { + const requiredKeys: (keyof MongoSearchOptions)[] = ['indexName']; + requiredKeys.forEach((key) => { + if (!options[key]) { + throw new Error(`Missing mongoSearch Option: ${key}`); + } + }); +}; + +const processBatch = async ( + items: T[], + batchSize: number, + delayMs: number, + processor: (batch: T[]) => Promise, +): Promise => { + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + await processor(batch); + + if (i + batchSize < items.length && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } +}; + +const createSearchMongooseModel = ({ + provider, + indexName, + attributesToIndex, + syncOptions, +}: { + provider: SearchProvider; + indexName: string; + attributesToIndex: string[]; + syncOptions: { batchSize: number; delayMs: number }; +}) => { + const primaryKey = attributesToIndex[0]; + const syncConfig = { ...getSyncConfig(), ...syncOptions }; + + class SearchMongooseModel { + static async getSyncProgress(this: SchemaWithSearchMethods): Promise { + const totalDocuments = await this.countDocuments({ expiredAt: null }); + const indexedDocuments = await this.countDocuments({ expiredAt: null, _meiliIndex: true }); + + return { + totalProcessed: indexedDocuments, + totalDocuments, + isComplete: indexedDocuments === totalDocuments, + }; + } + + static async syncWithSearch(this: SchemaWithSearchMethods): Promise { + const startTime = Date.now(); + const { batchSize, delayMs } = syncConfig; + + const collectionName = primaryKey === 'messageId' ? 'messages' : 'conversations'; + logger.info( + `[syncWithSearch] Starting sync for ${collectionName} with batch size ${batchSize}`, + ); + + const approxTotalCount = await this.estimatedDocumentCount(); + logger.info( + `[syncWithSearch] Approximate total number of all ${collectionName}: ${approxTotalCount}`, + ); + + try { + logger.info(`[syncWithSearch] Starting cleanup of index ${indexName} before sync`); + await this.cleanupSearchIndex(provider, indexName, primaryKey, batchSize, delayMs); + logger.info(`[syncWithSearch] Completed cleanup of index: ${indexName}`); + } catch (error) { + logger.error('[syncWithSearch] Error during cleanup before sync:', error); + throw error; + } + + let processedCount = 0; + let hasMore = true; + + while (hasMore) { + const query: FilterQuery = { + expiredAt: null, + _meiliIndex: false, + }; + + try { + const documents = await this.find(query) + .select(attributesToIndex.join(' ') + ' _meiliIndex') + .limit(batchSize) + .lean(); + + if (documents.length === 0) { + logger.info('[syncWithSearch] No more documents to process'); + break; + } + + await this.processSyncBatch(provider, indexName, documents); + processedCount += documents.length; + logger.info(`[syncWithSearch] Processed: ${processedCount}`); + + if (documents.length < batchSize) { + hasMore = false; + } + + if (hasMore && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } catch (error) { + logger.error('[syncWithSearch] Error processing documents batch:', error); + throw error; + } + } + + const duration = Date.now() - startTime; + logger.info( + `[syncWithSearch] Completed sync for ${collectionName}. Processed ${processedCount} documents in ${duration}ms`, + ); + } + + static async processSyncBatch( + this: SchemaWithSearchMethods, + provider: SearchProvider, + indexName: string, + documents: Array>, + ): Promise { + if (documents.length === 0) { + return; + } + + const formattedDocs = documents.map((doc) => + _.omitBy(_.pick(doc, attributesToIndex), (_v, k) => k.startsWith('$')), + ); + + try { + await provider.addDocumentsInBatches(indexName, formattedDocs, primaryKey); + + const docsIds = documents.map((doc) => doc._id); + await this.updateMany({ _id: { $in: docsIds } }, { $set: { _meiliIndex: true } }); + } catch (error) { + logger.error('[processSyncBatch] Error processing batch:', error); + throw error; + } + } + + static async cleanupSearchIndex( + this: SchemaWithSearchMethods, + provider: SearchProvider, + indexName: string, + primaryKey: string, + batchSize: number, + delayMs: number, + ): Promise { + try { + let offset = 0; + let moreDocuments = true; + + while (moreDocuments) { + const batch = await provider.getDocuments(indexName, { limit: batchSize, offset }); + if (batch.results.length === 0) { + moreDocuments = false; + break; + } + + const searchIds = batch.results.map((doc) => doc[primaryKey]); + const query: Record = {}; + query[primaryKey] = { $in: searchIds }; + + const existingDocs = await this.find(query).select(primaryKey).lean(); + + const existingIds = new Set( + existingDocs.map((doc: Record) => doc[primaryKey]), + ); + + const toDelete = searchIds.filter((id) => !existingIds.has(id)); + if (toDelete.length > 0) { + await provider.deleteDocuments(indexName, toDelete.map(String)); + logger.debug(`[cleanupSearchIndex] Deleted ${toDelete.length} orphaned documents`); + } + + if (batch.results.length < batchSize) { + break; + } + + offset += batchSize - toDelete.length; + + if (delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + } + } catch (error) { + logger.error('[cleanupSearchIndex] Error during cleanup:', error); + } + } + + static async setSearchIndexSettings(settings: Record): Promise { + return await provider.updateIndexSettings(indexName, { + filterableAttributes: settings.filterableAttributes as string[] | undefined, + searchableAttributes: settings.searchableAttributes as string[] | undefined, + sortableAttributes: settings.sortableAttributes as string[] | undefined, + }); + } + + static async searchIndex( + this: SchemaWithSearchMethods, + q: string, + params: { filter?: string; limit?: number; offset?: number }, + populate: boolean, + ): Promise { + const data = await provider.search(indexName, q, params); + + if (populate) { + const query: Record = {}; + query[primaryKey] = _.map(data.hits, (hit) => hit[primaryKey]); + + const projection = Object.keys(this.schema.obj).reduce>( + (results, key) => { + if (!key.startsWith('$')) { + results[key] = 1; + } + return results; + }, + { _id: 1, __v: 1 }, + ); + + const hitsFromMongoose = await this.find(query, projection).lean(); + + const populatedHits = data.hits.map((hit) => { + const queryObj: Record = {}; + queryObj[primaryKey] = hit[primaryKey]; + const originalHit = _.find(hitsFromMongoose, (item) => { + const typedItem = item as Record; + return typedItem[primaryKey] === hit[primaryKey]; + }); + + return { + ...(originalHit && typeof originalHit === 'object' ? originalHit : {}), + ...hit, + }; + }); + data.hits = populatedHits; + } + + return data; + } + + preprocessObjectForIndex(this: DocumentWithSearchIndex): Record { + const object = _.omitBy(_.pick(this.toJSON(), attributesToIndex), (v, k) => + k.startsWith('$'), + ); + + if ( + object.conversationId && + typeof object.conversationId === 'string' && + object.conversationId.includes('|') + ) { + object.conversationId = object.conversationId.replace(/\|/g, '--'); + } + + if (object.content && Array.isArray(object.content)) { + object.text = parseTextParts(object.content); + delete object.content; + } + + return object; + } + + async addObjectToSearchIndex( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): Promise { + if (!_.isNil(this.expiredAt)) { + return next(); + } + + const object = this.preprocessObjectForIndex!(); + const maxRetries = 3; + let retryCount = 0; + + while (retryCount < maxRetries) { + try { + await provider.addDocuments(indexName, [object], primaryKey); + break; + } catch (error) { + retryCount++; + if (retryCount >= maxRetries) { + logger.error( + '[addObjectToSearchIndex] Error adding document to search index after retries:', + error, + ); + return next(); + } + await new Promise((resolve) => setTimeout(resolve, Math.pow(2, retryCount) * 1000)); + } + } + + try { + await this.collection.updateMany( + { _id: this._id as Types.ObjectId }, + { $set: { _meiliIndex: true } }, + ); + } catch (error) { + logger.error('[addObjectToSearchIndex] Error updating _meiliIndex field:', error); + return next(); + } + + next(); + } + + async updateObjectInSearchIndex( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): Promise { + try { + const object = _.omitBy(_.pick(this.toJSON(), attributesToIndex), (v, k) => + k.startsWith('$'), + ); + await provider.updateDocuments(indexName, [object]); + next(); + } catch (error) { + logger.error('[updateObjectInSearchIndex] Error updating document in search index:', error); + return next(); + } + } + + async deleteObjectFromSearchIndex( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): Promise { + try { + await provider.deleteDocument(indexName, this._id as string); + next(); + } catch (error) { + logger.error( + '[deleteObjectFromSearchIndex] Error deleting document from search index:', + error, + ); + return next(); + } + } + + postSaveHook( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): void { + if (this._meiliIndex) { + this.updateObjectInSearchIndex!(next); + } else { + this.addObjectToSearchIndex!(next); + } + } + + postUpdateHook( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): void { + if (this._meiliIndex) { + this.updateObjectInSearchIndex!(next); + } else { + next(); + } + } + + postRemoveHook( + this: DocumentWithSearchIndex, + next: CallbackWithoutResultAndOptionalError, + ): void { + if (this._meiliIndex) { + this.deleteObjectFromSearchIndex!(next); + } else { + next(); + } + } + } + + return SearchMongooseModel; +}; + +/** + * Generic Mongoose plugin to synchronize MongoDB collections with a search index. + * + * This plugin is provider-agnostic — it works with MeiliSearch, OpenSearch, or any + * future provider that implements the SearchProvider interface. + * + * It maintains full backward compatibility: + * - The `_meiliIndex` field name is preserved so existing data doesn't need migration. + * - All existing MEILI_* environment variables continue to work. + * - The `meiliSearch` static method is aliased to `searchIndex` for backward compat. + */ +export default function mongoSearch(schema: Schema, options: MongoSearchOptions): void { + const mongoose = options.mongoose; + validateOptions(options); + + schema.add({ + _meiliIndex: { + type: Boolean, + required: false, + select: false, + default: false, + }, + }); + + const { indexName, primaryKey } = options; + const syncOptions = { + batchSize: options.syncBatchSize || getSyncConfig().batchSize, + delayMs: options.syncDelayMs || getSyncConfig().delayMs, + }; + + const provider = getSearchProvider(); + if (!provider) { + logger.debug('[mongoSearch] No search provider configured, skipping plugin setup'); + return; + } + + // Create index and configure settings asynchronously + (async () => { + try { + await provider.createIndex(indexName, primaryKey); + } catch (error) { + logger.error(`[mongoSearch] Error creating index ${indexName}:`, error); + } + + try { + await provider.updateIndexSettings(indexName, { + filterableAttributes: ['user'], + }); + logger.debug(`[mongoSearch] Updated index ${indexName} settings to make 'user' filterable`); + } catch (settingsError) { + logger.error( + `[mongoSearch] Error updating index settings for ${indexName}:`, + settingsError, + ); + } + })(); + + const attributesToIndex: string[] = [ + ...Object.entries(schema.obj).reduce((results, [key, value]) => { + const schemaValue = value as { meiliIndex?: boolean }; + return schemaValue.meiliIndex ? [...results, key] : results; + }, []), + ]; + + if (schema.obj.user && !attributesToIndex.includes('user')) { + attributesToIndex.push('user'); + logger.debug(`[mongoSearch] Added 'user' field to ${indexName} index attributes`); + } + + schema.loadClass( + createSearchMongooseModel({ provider, indexName, attributesToIndex, syncOptions }), + ); + + // Backward compatibility aliases: map old method names to new ones + schema.statics.syncWithMeili = schema.statics.syncWithSearch; + schema.statics.meiliSearch = schema.statics.searchIndex; + + // Register Mongoose hooks + schema.post('save', function (doc: DocumentWithSearchIndex, next) { + doc.postSaveHook?.(next); + }); + + schema.post('updateOne', function (doc: DocumentWithSearchIndex, next) { + doc.postUpdateHook?.(next); + }); + + schema.post('deleteOne', function (doc: DocumentWithSearchIndex, next) { + doc.postRemoveHook?.(next); + }); + + schema.pre('deleteMany', async function (next) { + if (!isEnabled) { + return next(); + } + + try { + const conditions = (this as Query).getQuery(); + const { batchSize, delayMs } = syncOptions; + + if (Object.prototype.hasOwnProperty.call(schema.obj, 'messages')) { + const deletedConvos = await mongoose + .model('Conversation') + .find(conditions as FilterQuery) + .select('conversationId') + .lean(); + + await processBatch(deletedConvos, batchSize, delayMs, async (batch) => { + const promises = batch.map((convo: Record) => + provider.deleteDocument('convos', convo.conversationId as string), + ); + await Promise.all(promises); + }); + } + + if (Object.prototype.hasOwnProperty.call(schema.obj, 'messageId')) { + const deletedMessages = await mongoose + .model('Message') + .find(conditions as FilterQuery) + .select('messageId') + .lean(); + + await processBatch(deletedMessages, batchSize, delayMs, async (batch) => { + const promises = batch.map((message: Record) => + provider.deleteDocument('messages', message.messageId as string), + ); + await Promise.all(promises); + }); + } + return next(); + } catch (error) { + if (isEnabled) { + logger.error( + '[SearchMongooseModel.deleteMany] There was an issue deleting indexes upon deletion. Next startup may trigger syncing.', + error, + ); + } + return next(); + } + }); + + schema.post('findOneAndUpdate', async function (doc: DocumentWithSearchIndex, next) { + if (!isEnabled) { + return next(); + } + + if (doc.unfinished) { + return next(); + } + + let searchDoc: SearchHit | null = null; + if (doc.messages) { + try { + searchDoc = await provider.getDocument('convos', doc.conversationId as string); + } catch (error: unknown) { + logger.debug( + '[SearchMongooseModel.findOneAndUpdate] Convo not found in search index and will index ' + + doc.conversationId, + error as Record, + ); + } + } + + if (searchDoc && searchDoc.title === doc.title) { + return next(); + } + + doc.postSaveHook?.(next); + }); +} diff --git a/packages/data-schemas/src/models/plugins/search/index.ts b/packages/data-schemas/src/models/plugins/search/index.ts new file mode 100644 index 000000000000..9ba56ed5af21 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/index.ts @@ -0,0 +1,20 @@ +export type { + SearchProvider, + SearchProviderConfig, + SearchHit, + SearchResult, + SearchParams, + IndexSettings, +} from './searchProvider'; +export { MeiliSearchProvider } from './meiliSearchProvider'; +export type { MeiliSearchProviderOptions } from './meiliSearchProvider'; +export { OpenSearchProvider } from './openSearchProvider'; +export type { OpenSearchProviderOptions } from './openSearchProvider'; +export { TypesenseProvider } from './typesenseProvider'; +export type { TypesenseProviderOptions } from './typesenseProvider'; +export { + getSearchProvider, + resetSearchProvider, + detectSearchProvider, + isSearchEnabled, +} from './searchProviderFactory'; diff --git a/packages/data-schemas/src/models/plugins/search/meiliSearchProvider.ts b/packages/data-schemas/src/models/plugins/search/meiliSearchProvider.ts new file mode 100644 index 000000000000..a2e535df3b8d --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/meiliSearchProvider.ts @@ -0,0 +1,176 @@ +import { MeiliSearch } from 'meilisearch'; +import type { SearchParams as MeiliSearchParams } from 'meilisearch'; +import type { + SearchProvider, + SearchHit, + SearchResult, + SearchParams, + IndexSettings, +} from './searchProvider'; +import logger from '~/config/meiliLogger'; + +export interface MeiliSearchProviderOptions { + host: string; + apiKey: string; +} + +/** + * MeiliSearch implementation of the SearchProvider interface. + * Wraps the existing MeiliSearch client to conform to the abstraction layer. + */ +export class MeiliSearchProvider implements SearchProvider { + readonly name = 'meilisearch'; + private client: MeiliSearch; + + constructor(options: MeiliSearchProviderOptions) { + if (!options.host || !options.apiKey) { + throw new Error('MeiliSearch provider requires host and apiKey'); + } + this.client = new MeiliSearch({ + host: options.host, + apiKey: options.apiKey, + }); + } + + /** Expose the raw MeiliSearch client for backward-compatible code paths */ + getClient(): MeiliSearch { + return this.client; + } + + async healthCheck(): Promise { + try { + const { status } = await this.client.health(); + return status === 'available'; + } catch (error) { + logger.debug('[MeiliSearchProvider] Health check failed:', error); + return false; + } + } + + async createIndex(indexName: string, primaryKey: string): Promise { + const index = this.client.index(indexName); + try { + await index.getRawInfo(); + logger.debug(`[MeiliSearchProvider] Index ${indexName} already exists`); + } catch (error) { + const errorCode = (error as { code?: string })?.code; + if (errorCode === 'index_not_found') { + try { + logger.info(`[MeiliSearchProvider] Creating new index: ${indexName}`); + await this.client.createIndex(indexName, { primaryKey }); + logger.info(`[MeiliSearchProvider] Successfully created index: ${indexName}`); + } catch (createError) { + logger.debug( + `[MeiliSearchProvider] Index ${indexName} may already exist:`, + createError, + ); + } + } else { + logger.error(`[MeiliSearchProvider] Error checking index ${indexName}:`, error); + } + } + } + + async updateIndexSettings(indexName: string, settings: IndexSettings): Promise { + try { + const index = this.client.index(indexName); + await index.updateSettings({ + filterableAttributes: settings.filterableAttributes, + searchableAttributes: settings.searchableAttributes, + sortableAttributes: settings.sortableAttributes, + }); + logger.debug(`[MeiliSearchProvider] Updated index ${indexName} settings`); + } catch (error) { + logger.error(`[MeiliSearchProvider] Error updating index settings for ${indexName}:`, error); + } + } + + async getIndexSettings(indexName: string): Promise { + try { + const index = this.client.index(indexName); + const settings = await index.getSettings(); + return { + filterableAttributes: settings.filterableAttributes as string[], + searchableAttributes: settings.searchableAttributes as string[], + sortableAttributes: settings.sortableAttributes as string[], + }; + } catch (error) { + logger.error(`[MeiliSearchProvider] Error getting index settings for ${indexName}:`, error); + return {}; + } + } + + async addDocuments(indexName: string, documents: SearchHit[]): Promise { + const index = this.client.index(indexName); + await index.addDocuments(documents); + } + + async addDocumentsInBatches( + indexName: string, + documents: SearchHit[], + _primaryKey?: string, + batchSize?: number, + ): Promise { + const index = this.client.index(indexName); + await index.addDocumentsInBatches(documents, batchSize); + } + + async updateDocuments(indexName: string, documents: SearchHit[]): Promise { + const index = this.client.index(indexName); + await index.updateDocuments(documents); + } + + async deleteDocument(indexName: string, documentId: string): Promise { + const index = this.client.index(indexName); + await index.deleteDocument(documentId); + } + + async deleteDocuments(indexName: string, documentIds: string[]): Promise { + const index = this.client.index(indexName); + await index.deleteDocuments(documentIds); + } + + async getDocument(indexName: string, documentId: string): Promise { + try { + const index = this.client.index(indexName); + return await index.getDocument(documentId); + } catch (error) { + return null; + } + } + + async getDocuments( + indexName: string, + options: { limit: number; offset: number }, + ): Promise<{ results: SearchHit[] }> { + const index = this.client.index(indexName); + const result = await index.getDocuments(options); + return { results: result.results as SearchHit[] }; + } + + async search(indexName: string, query: string, params?: SearchParams): Promise { + const index = this.client.index(indexName); + const meiliParams: MeiliSearchParams = {}; + + if (params?.filter) { + meiliParams.filter = params.filter; + } + if (params?.limit !== undefined) { + meiliParams.limit = params.limit; + } + if (params?.offset !== undefined) { + meiliParams.offset = params.offset; + } + if (params?.sort) { + meiliParams.sort = params.sort; + } + + const result = await index.search(query, meiliParams); + return { + hits: result.hits as SearchHit[], + totalHits: result.estimatedTotalHits, + offset: result.offset, + limit: result.limit, + }; + } +} diff --git a/packages/data-schemas/src/models/plugins/search/openSearchProvider.spec.ts b/packages/data-schemas/src/models/plugins/search/openSearchProvider.spec.ts new file mode 100644 index 000000000000..ca47dfe8d002 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/openSearchProvider.spec.ts @@ -0,0 +1,501 @@ +/** + * Tests for the OpenSearch Provider. + * + * These tests verify: + * - Constructor validation + * - HTTP request building (auth headers, TLS) + * - Query/filter translation from MeiliSearch-style to OpenSearch DSL + * - All SearchProvider interface methods + */ + +import { OpenSearchProvider } from './openSearchProvider'; + +// Mock fetch globally +const mockFetch = jest.fn(); +global.fetch = mockFetch as unknown as typeof fetch; + +// Mock logger +jest.mock('~/config/meiliLogger', () => ({ + __esModule: true, + default: { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + }, +})); + +describe('OpenSearchProvider', () => { + let provider: OpenSearchProvider; + + beforeEach(() => { + mockFetch.mockReset(); + provider = new OpenSearchProvider({ + node: 'https://localhost:9200', + username: 'admin', + password: 'admin123', + insecure: false, + }); + }); + + describe('constructor', () => { + test('throws when node URL is missing', () => { + expect(() => new OpenSearchProvider({ node: '' })).toThrow( + 'OpenSearch provider requires a node URL', + ); + }); + + test('strips trailing slashes from node URL', () => { + const p = new OpenSearchProvider({ node: 'https://localhost:9200///' }); + expect((p as unknown as Record)['node']).toBe('https://localhost:9200'); + }); + + test('defaults username to "admin" when not provided', () => { + const p = new OpenSearchProvider({ node: 'https://localhost:9200', password: 'pass' }); + // Auth header should contain base64 of "admin:pass" + const expectedAuth = 'Basic ' + Buffer.from('admin:pass').toString('base64'); + expect((p as unknown as Record)['authHeader']).toBe(expectedAuth); + }); + + test('builds correct auth header', () => { + const expectedAuth = 'Basic ' + Buffer.from('admin:admin123').toString('base64'); + expect((provider as unknown as Record)['authHeader']).toBe(expectedAuth); + }); + }); + + describe('healthCheck', () => { + test('returns true when cluster status is green', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ status: 'green' })), + }); + + expect(await provider.healthCheck()).toBe(true); + expect(mockFetch).toHaveBeenCalledWith( + 'https://localhost:9200/_cluster/health', + expect.objectContaining({ method: 'GET' }), + ); + }); + + test('returns true when cluster status is yellow', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ status: 'yellow' })), + }); + + expect(await provider.healthCheck()).toBe(true); + }); + + test('returns false when cluster status is red', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ status: 'red' })), + }); + + expect(await provider.healthCheck()).toBe(false); + }); + + test('returns false on network error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Connection refused')); + + expect(await provider.healthCheck()).toBe(false); + }); + + test('returns false on non-2xx status', async () => { + mockFetch.mockResolvedValueOnce({ + status: 503, + text: () => Promise.resolve(JSON.stringify({ error: 'unavailable' })), + }); + + expect(await provider.healthCheck()).toBe(false); + }); + }); + + describe('createIndex', () => { + test('skips creation when index already exists', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(''), + }); + + await provider.createIndex('messages', 'messageId'); + + // Only HEAD request, no PUT + expect(mockFetch).toHaveBeenCalledTimes(1); + expect(mockFetch).toHaveBeenCalledWith( + 'https://localhost:9200/messages', + expect.objectContaining({ method: 'HEAD' }), + ); + }); + + test('creates index when it does not exist', async () => { + mockFetch + .mockResolvedValueOnce({ + status: 404, + text: () => Promise.resolve('{"error":"index_not_found"}'), + }) + .mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{"acknowledged":true}'), + }); + + await provider.createIndex('messages', 'messageId'); + + expect(mockFetch).toHaveBeenCalledTimes(2); + const putCall = mockFetch.mock.calls[1]; + expect(putCall[0]).toBe('https://localhost:9200/messages'); + expect(putCall[1].method).toBe('PUT'); + + const body = JSON.parse(putCall[1].body); + expect(body.mappings.properties.messageId.type).toBe('keyword'); + expect(body.mappings.properties.user.type).toBe('keyword'); + }); + }); + + describe('addDocuments', () => { + test('does nothing for empty array', async () => { + await provider.addDocuments('messages', []); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + test('sends bulk request with correct format', async () => { + mockFetch.mockResolvedValueOnce({ + json: () => Promise.resolve({ errors: false, items: [] }), + }); + + await provider.addDocuments( + 'messages', + [ + { messageId: 'msg1', text: 'hello', user: 'user1' }, + { messageId: 'msg2', text: 'world', user: 'user2' }, + ], + 'messageId', + ); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const call = mockFetch.mock.calls[0]; + expect(call[0]).toBe('https://localhost:9200/messages/_bulk'); + expect(call[1].headers['Content-Type']).toBe('application/x-ndjson'); + + const lines = (call[1].body as string).trim().split('\n'); + expect(lines.length).toBe(4); // 2 action + 2 document lines + + const action1 = JSON.parse(lines[0]); + expect(action1.index._id).toBe('msg1'); + expect(action1.index._index).toBe('messages'); + }); + }); + + describe('addDocumentsInBatches', () => { + test('splits documents into batches', async () => { + mockFetch + .mockResolvedValueOnce({ + json: () => Promise.resolve({ errors: false, items: [] }), + }) + .mockResolvedValueOnce({ + json: () => Promise.resolve({ errors: false, items: [] }), + }); + + const docs = Array.from({ length: 5 }, (_, i) => ({ + id: `doc${i}`, + text: `text${i}`, + })); + + await provider.addDocumentsInBatches('test', docs, 'id', 3); + + // Should be called twice: batch of 3 + batch of 2 + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + }); + + describe('deleteDocument', () => { + test('sends DELETE request with encoded document ID', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{"result":"deleted"}'), + }); + + await provider.deleteDocument('messages', 'msg/1'); + + expect(mockFetch).toHaveBeenCalledWith( + 'https://localhost:9200/messages/_doc/msg%2F1', + expect.objectContaining({ method: 'DELETE' }), + ); + }); + }); + + describe('deleteDocuments', () => { + test('does nothing for empty array', async () => { + await provider.deleteDocuments('messages', []); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + test('sends bulk delete request', async () => { + mockFetch.mockResolvedValueOnce({ + json: () => Promise.resolve({ errors: false, items: [] }), + }); + + await provider.deleteDocuments('messages', ['msg1', 'msg2']); + + const call = mockFetch.mock.calls[0]; + const lines = (call[1].body as string).trim().split('\n'); + expect(lines.length).toBe(2); + + const action1 = JSON.parse(lines[0]); + expect(action1.delete._id).toBe('msg1'); + }); + }); + + describe('getDocument', () => { + test('returns document source on success', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + _source: { messageId: 'msg1', text: 'hello' }, + }), + ), + }); + + const doc = await provider.getDocument('messages', 'msg1'); + expect(doc).toEqual({ messageId: 'msg1', text: 'hello' }); + }); + + test('returns null when document not found', async () => { + mockFetch.mockResolvedValueOnce({ + status: 404, + text: () => Promise.resolve('{"found":false}'), + }); + + const doc = await provider.getDocument('messages', 'nonexistent'); + expect(doc).toBeNull(); + }); + + test('returns null on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const doc = await provider.getDocument('messages', 'msg1'); + expect(doc).toBeNull(); + }); + }); + + describe('search', () => { + test('performs match_all query when no query string', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { hits: [], total: { value: 0 } }, + }), + ), + }); + + await provider.search('messages', ''); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.query).toEqual({ match_all: {} }); + }); + + test('performs multi_match query with fuzziness', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { + hits: [{ _source: { messageId: 'msg1', text: 'hello world' } }], + total: { value: 1 }, + }, + }), + ), + }); + + const result = await provider.search('messages', 'hello'); + + expect(result.hits).toHaveLength(1); + expect(result.totalHits).toBe(1); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.query.bool.must[0].multi_match.query).toBe('hello'); + expect(body.query.bool.must[0].multi_match.fuzziness).toBe('AUTO'); + }); + + test('translates MeiliSearch-style filter to OpenSearch term filter', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { hits: [], total: { value: 0 } }, + }), + ), + }); + + await provider.search('messages', 'hello', { + filter: 'user = "user123"', + }); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.query.bool.filter).toEqual([{ term: { user: 'user123' } }]); + }); + + test('handles AND filters', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { hits: [], total: { value: 0 } }, + }), + ), + }); + + await provider.search('messages', 'hello', { + filter: 'user = "user123" AND status = "active"', + }); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.query.bool.filter).toEqual([ + { term: { user: 'user123' } }, + { term: { status: 'active' } }, + ]); + }); + + test('applies limit and offset', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { hits: [], total: { value: 0 } }, + }), + ), + }); + + await provider.search('messages', 'hello', { limit: 10, offset: 20 }); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.size).toBe(10); + expect(body.from).toBe(20); + }); + + test('applies sort parameters', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { hits: [], total: { value: 0 } }, + }), + ), + }); + + await provider.search('messages', 'hello', { + sort: ['createdAt:desc', 'title:asc'], + }); + + const call = mockFetch.mock.calls[0]; + const body = JSON.parse(call[1].body); + expect(body.sort).toEqual([ + { createdAt: { order: 'desc' } }, + { title: { order: 'asc' } }, + ]); + }); + + test('returns empty result on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const result = await provider.search('messages', 'hello'); + expect(result.hits).toEqual([]); + expect(result.totalHits).toBe(0); + }); + }); + + describe('getIndexSettings', () => { + test('extracts filterable attributes from keyword-typed fields', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + messages: { + mappings: { + properties: { + user: { type: 'keyword' }, + messageId: { type: 'keyword' }, + text: { type: 'text' }, + }, + }, + }, + }), + ), + }); + + const settings = await provider.getIndexSettings('messages'); + expect(settings.filterableAttributes).toContain('user'); + expect(settings.filterableAttributes).toContain('messageId'); + expect(settings.filterableAttributes).not.toContain('text'); + }); + + test('returns empty settings on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const settings = await provider.getIndexSettings('messages'); + expect(settings).toEqual({}); + }); + }); + + describe('updateIndexSettings', () => { + test('updates mappings for filterable attributes', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{"acknowledged":true}'), + }); + + await provider.updateIndexSettings('messages', { + filterableAttributes: ['user', 'status'], + }); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toBe('https://localhost:9200/messages/_mapping'); + const body = JSON.parse(call[1].body); + expect(body.properties.user.type).toBe('keyword'); + expect(body.properties.status.type).toBe('keyword'); + }); + + test('does nothing when no filterable attributes', async () => { + await provider.updateIndexSettings('messages', {}); + expect(mockFetch).not.toHaveBeenCalled(); + }); + }); + + describe('getDocuments', () => { + test('returns documents with pagination', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + hits: { + hits: [ + { _id: '1', _source: { messageId: 'msg1' } }, + { _id: '2', _source: { messageId: 'msg2' } }, + ], + }, + }), + ), + }); + + const result = await provider.getDocuments('messages', { limit: 10, offset: 0 }); + expect(result.results).toHaveLength(2); + expect(result.results[0]).toHaveProperty('messageId', 'msg1'); + }); + }); +}); diff --git a/packages/data-schemas/src/models/plugins/search/openSearchProvider.ts b/packages/data-schemas/src/models/plugins/search/openSearchProvider.ts new file mode 100644 index 000000000000..8716fa67bd98 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/openSearchProvider.ts @@ -0,0 +1,503 @@ +import type { + SearchProvider, + SearchHit, + SearchResult, + SearchParams, + IndexSettings, +} from './searchProvider'; +import logger from '~/config/meiliLogger'; + +/** + * Minimal OpenSearch HTTP client. + * + * Uses the built-in fetch API (Node 18+) to communicate with OpenSearch's REST API, + * avoiding a heavy SDK dependency. This keeps the footprint small and the provider + * self-contained within the data-schemas package. + */ + +export interface OpenSearchProviderOptions { + /** OpenSearch node URL, e.g. https://localhost:9200 */ + node: string; + /** HTTP Basic Auth username (default: 'admin') */ + username?: string; + /** HTTP Basic Auth password */ + password?: string; + /** Skip TLS certificate verification (useful for self-signed certs in dev) */ + insecure?: boolean; + /** Connection timeout in milliseconds (default: 30000) */ + connectionTimeoutMs?: number; +} + +interface OpenSearchBulkResponseItem { + index?: { _id?: string; status?: number; error?: unknown }; + delete?: { _id?: string; status?: number; error?: unknown }; +} + +export class OpenSearchProvider implements SearchProvider { + readonly name = 'opensearch'; + private node: string; + private authHeader: string; + private insecure: boolean; + private connectionTimeoutMs: number; + + constructor(options: OpenSearchProviderOptions) { + if (!options.node) { + throw new Error('OpenSearch provider requires a node URL'); + } + this.node = options.node.replace(/\/+$/, ''); + const username = options.username || 'admin'; + const password = options.password || ''; + this.authHeader = 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64'); + this.insecure = options.insecure ?? false; + this.connectionTimeoutMs = options.connectionTimeoutMs ?? 30000; + } + + // ------------------------------------------------------------------ // + // Internal HTTP helpers // + // ------------------------------------------------------------------ // + + private async request( + method: string, + path: string, + body?: unknown, + ): Promise<{ status: number; data: Record }> { + const url = `${this.node}${path}`; + const headers: Record = { + 'Content-Type': 'application/json', + Authorization: this.authHeader, + }; + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), this.connectionTimeoutMs); + + const fetchOptions: RequestInit & { dispatcher?: unknown } = { + method, + headers, + body: body !== undefined ? JSON.stringify(body) : undefined, + signal: controller.signal, + }; + + // Support insecure TLS via undici dispatcher when available + if (this.insecure) { + try { + // Use require() so Rollup externalizes this instead of code-splitting + // eslint-disable-next-line @typescript-eslint/no-var-requires + const { Agent } = require('undici') as { Agent: new (opts: Record) => unknown }; + fetchOptions.dispatcher = new Agent({ + connect: { rejectUnauthorized: false }, + }); + } catch (error) { + logger.warn( + '[OpenSearchProvider] Insecure TLS requested but "undici" is not available. ' + + 'Falling back to default TLS verification. To enable insecure TLS, install the "undici" package.', + { error }, + ); + } + } + + try { + const response = await fetch(url, fetchOptions); + const text = await response.text(); + let data: Record = {}; + try { + data = JSON.parse(text) as Record; + } catch { + data = { raw: text }; + } + return { status: response.status, data }; + } finally { + clearTimeout(timeout); + } + } + + private async bulkRequest( + indexName: string, + operations: string[], + ): Promise<{ errors: boolean; items: OpenSearchBulkResponseItem[] }> { + const url = `${this.node}/${indexName}/_bulk`; + const headers: Record = { + 'Content-Type': 'application/x-ndjson', + Authorization: this.authHeader, + }; + + const bodyStr = operations.join('\n') + '\n'; + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), this.connectionTimeoutMs * 2); + + const fetchOptions: RequestInit & { dispatcher?: unknown } = { + method: 'POST', + headers, + body: bodyStr, + signal: controller.signal, + }; + + if (this.insecure) { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const { Agent } = require('undici') as { Agent: new (opts: Record) => unknown }; + fetchOptions.dispatcher = new Agent({ + connect: { rejectUnauthorized: false }, + }); + } catch { + // undici not available + } + } + + try { + const response = await fetch(url, fetchOptions); + const data = (await response.json()) as { + errors: boolean; + items: OpenSearchBulkResponseItem[]; + }; + return data; + } finally { + clearTimeout(timeout); + } + } + + // ------------------------------------------------------------------ // + // SearchProvider implementation // + // ------------------------------------------------------------------ // + + async healthCheck(): Promise { + try { + const { status, data } = await this.request('GET', '/_cluster/health'); + if (status >= 200 && status < 300) { + const clusterStatus = data.status as string | undefined; + return clusterStatus === 'green' || clusterStatus === 'yellow'; + } + return false; + } catch (error) { + logger.debug('[OpenSearchProvider] Health check failed:', error); + return false; + } + } + + async createIndex(indexName: string, primaryKey: string): Promise { + try { + // Check if index exists + const { status } = await this.request('HEAD', `/${indexName}`); + if (status === 200) { + logger.debug(`[OpenSearchProvider] Index ${indexName} already exists`); + return; + } + } catch { + // Index doesn't exist, create it + } + + try { + const mappings: Record = { + properties: { + [primaryKey]: { type: 'keyword' }, + user: { type: 'keyword' }, + }, + }; + + await this.request('PUT', `/${indexName}`, { + settings: { + number_of_shards: 1, + number_of_replicas: 1, + }, + mappings, + }); + logger.info(`[OpenSearchProvider] Created index: ${indexName}`); + } catch (error) { + logger.debug(`[OpenSearchProvider] Index ${indexName} may already exist:`, error); + } + } + + async updateIndexSettings(indexName: string, settings: IndexSettings): Promise { + try { + // In OpenSearch, filterable/sortable attributes are handled via mappings. + // We update the mapping to ensure the specified fields are keyword-typed. + if (settings.filterableAttributes && settings.filterableAttributes.length > 0) { + const properties: Record = {}; + for (const attr of settings.filterableAttributes) { + properties[attr] = { type: 'keyword' }; + } + await this.request('PUT', `/${indexName}/_mapping`, { properties }); + logger.debug(`[OpenSearchProvider] Updated mappings for ${indexName}`); + } + } catch (error) { + logger.error( + `[OpenSearchProvider] Error updating index settings for ${indexName}:`, + error, + ); + } + } + + async getIndexSettings(indexName: string): Promise { + try { + const { data } = await this.request('GET', `/${indexName}/_mapping`); + const indexData = data[indexName] as Record | undefined; + const mappings = indexData?.mappings as Record | undefined; + const properties = mappings?.properties as Record> | undefined; + + const filterableAttributes: string[] = []; + if (properties) { + for (const [key, value] of Object.entries(properties)) { + if (value.type === 'keyword') { + filterableAttributes.push(key); + } + } + } + return { filterableAttributes }; + } catch (error) { + logger.error( + `[OpenSearchProvider] Error getting index settings for ${indexName}:`, + error, + ); + return {}; + } + } + + async addDocuments( + indexName: string, + documents: SearchHit[], + primaryKey?: string, + ): Promise { + if (documents.length === 0) { + return; + } + + const operations: string[] = []; + for (const doc of documents) { + const id = primaryKey ? String(doc[primaryKey]) : undefined; + const action = id + ? JSON.stringify({ index: { _index: indexName, _id: id } }) + : JSON.stringify({ index: { _index: indexName } }); + operations.push(action); + operations.push(JSON.stringify(doc)); + } + + const result = await this.bulkRequest(indexName, operations); + if (result.errors) { + const errorItems = result.items.filter( + (item) => item.index?.error, + ); + logger.error( + `[OpenSearchProvider] Bulk index errors in ${indexName}: ${errorItems.length} failures`, + ); + } + } + + async addDocumentsInBatches( + indexName: string, + documents: SearchHit[], + primaryKey?: string, + batchSize: number = 100, + ): Promise { + for (let i = 0; i < documents.length; i += batchSize) { + const batch = documents.slice(i, i + batchSize); + await this.addDocuments(indexName, batch, primaryKey); + } + } + + async updateDocuments(indexName: string, documents: SearchHit[]): Promise { + // OpenSearch uses the same index API for upserts + await this.addDocuments(indexName, documents); + } + + async deleteDocument(indexName: string, documentId: string): Promise { + try { + await this.request('DELETE', `/${indexName}/_doc/${encodeURIComponent(documentId)}`); + } catch (error) { + logger.error( + `[OpenSearchProvider] Error deleting document ${documentId} from ${indexName}:`, + error, + ); + } + } + + async deleteDocuments(indexName: string, documentIds: string[]): Promise { + if (documentIds.length === 0) { + return; + } + + const operations: string[] = []; + for (const id of documentIds) { + operations.push(JSON.stringify({ delete: { _index: indexName, _id: id } })); + } + + const result = await this.bulkRequest(indexName, operations); + if (result.errors) { + const errorItems = result.items.filter( + (item) => item.delete?.error, + ); + logger.error( + `[OpenSearchProvider] Bulk delete errors in ${indexName}: ${errorItems.length} failures`, + ); + } + } + + async getDocument(indexName: string, documentId: string): Promise { + try { + const { status, data } = await this.request( + 'GET', + `/${indexName}/_doc/${encodeURIComponent(documentId)}`, + ); + if (status === 200 && data._source) { + return data._source as SearchHit; + } + return null; + } catch { + return null; + } + } + + async getDocuments( + indexName: string, + options: { limit: number; offset: number }, + ): Promise<{ results: SearchHit[] }> { + try { + const { data } = await this.request('POST', `/${indexName}/_search`, { + query: { match_all: {} }, + size: options.limit, + from: options.offset, + }); + + const hits = data.hits as { hits?: Array<{ _source: SearchHit; _id: string }> } | undefined; + const results: SearchHit[] = (hits?.hits || []).map((hit) => ({ + ...hit._source, + _opensearch_id: hit._id, + })); + return { results }; + } catch (error) { + logger.error(`[OpenSearchProvider] Error getting documents from ${indexName}:`, error); + return { results: [] }; + } + } + + async search(indexName: string, query: string, params?: SearchParams): Promise { + try { + const searchBody: Record = {}; + + // Build the query + if (query) { + searchBody.query = this.buildQuery(query, params?.filter); + } else { + if (params?.filter) { + searchBody.query = this.buildFilterQuery(params.filter); + } else { + searchBody.query = { match_all: {} }; + } + } + + if (params?.limit !== undefined) { + searchBody.size = params.limit; + } + if (params?.offset !== undefined) { + searchBody.from = params.offset; + } + if (params?.sort) { + searchBody.sort = params.sort.map((s) => { + const match = s.match(/^(.+?):(asc|desc)$/i); + const field = match ? match[1] : s; + const order = match ? match[2].toLowerCase() : 'asc'; + return { [field]: { order } }; + }); + } + + const { data } = await this.request('POST', `/${indexName}/_search`, searchBody); + + const hitsData = data.hits as { + hits?: Array<{ _source: SearchHit; _id: string }>; + total?: { value?: number } | number; + } | undefined; + + const hits: SearchHit[] = (hitsData?.hits || []).map((hit) => { + const source = hit._source || ({} as SearchHit); + const result: SearchHit = { + ...source, + _opensearch_id: hit._id, + }; + if (!(source as Record).id) { + (result as Record).id = hit._id; + } + return result; + }); + + let totalHits = 0; + if (typeof hitsData?.total === 'object' && hitsData.total !== null) { + totalHits = hitsData.total.value ?? 0; + } else if (typeof hitsData?.total === 'number') { + totalHits = hitsData.total; + } + + return { + hits, + totalHits, + offset: params?.offset, + limit: params?.limit, + }; + } catch (error) { + logger.error(`[OpenSearchProvider] Error searching ${indexName}:`, error); + return { hits: [], totalHits: 0 }; + } + } + + // ------------------------------------------------------------------ // + // Query building helpers // + // ------------------------------------------------------------------ // + + /** + * Build an OpenSearch query from a text query and optional MeiliSearch-style filter. + * MeiliSearch filters use syntax like: `user = "userId"` + * We translate this to OpenSearch bool query with term filters. + */ + private buildQuery(query: string, filter?: string): Record { + const must: Record[] = [ + { + multi_match: { + query, + fields: ['*'], + type: 'best_fields', + fuzziness: 'AUTO', + }, + }, + ]; + + const filterClauses = filter ? this.parseFilter(filter) : []; + + return { + bool: { + must, + ...(filterClauses.length > 0 ? { filter: filterClauses } : {}), + }, + }; + } + + private buildFilterQuery(filter: string): Record { + const filterClauses = this.parseFilter(filter); + if (filterClauses.length === 0) { + return { match_all: {} }; + } + return { + bool: { + filter: filterClauses, + }, + }; + } + + /** + * Parse a MeiliSearch-style filter string into OpenSearch filter clauses. + * Supports basic equality filters: `field = "value"` and `field = 'value'` + * Multiple filters can be combined with AND. + */ + private parseFilter(filter: string): Record[] { + const clauses: Record[] = []; + // Split on AND (case-insensitive) + const parts = filter.split(/\s+AND\s+/i); + + for (const part of parts) { + const match = part.trim().match(/^(\w+)\s*=\s*(["'])(.*)\2$/); + if (match) { + const [, field, , value] = match; + clauses.push({ term: { [field]: value } }); + } + } + + return clauses; + } +} diff --git a/packages/data-schemas/src/models/plugins/search/searchProvider.ts b/packages/data-schemas/src/models/plugins/search/searchProvider.ts new file mode 100644 index 000000000000..834d1ecd8674 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/searchProvider.ts @@ -0,0 +1,144 @@ +/** + * Search Provider Abstraction Layer + * + * Defines the interface that all search engine backends must implement. + * This enables LibreChat to support multiple search engines (MeiliSearch, OpenSearch, etc.) + * in a modular, extensible way. + */ + +export interface SearchHit { + [key: string]: unknown; +} + +export interface SearchResult { + hits: SearchHit[]; + totalHits?: number; + offset?: number; + limit?: number; +} + +export interface SearchParams { + filter?: string; + limit?: number; + offset?: number; + sort?: string[]; +} + +export interface IndexSettings { + filterableAttributes?: string[]; + searchableAttributes?: string[]; + sortableAttributes?: string[]; +} + +/** + * Core interface that every search provider must implement. + * Designed to be minimal yet sufficient for LibreChat's search needs. + */ +export interface SearchProvider { + /** Unique identifier for this provider (e.g., 'meilisearch', 'opensearch') */ + readonly name: string; + + /** + * Check if the search backend is healthy and available. + * @returns true if the backend is reachable and operational + */ + healthCheck(): Promise; + + /** + * Create an index if it does not already exist. + * @param indexName - Name of the index to create + * @param primaryKey - The primary key field for documents in this index + */ + createIndex(indexName: string, primaryKey: string): Promise; + + /** + * Update settings for an index (e.g., filterable attributes). + * @param indexName - Name of the index + * @param settings - Settings to apply + */ + updateIndexSettings(indexName: string, settings: IndexSettings): Promise; + + /** + * Get current settings for an index. + * @param indexName - Name of the index + */ + getIndexSettings(indexName: string): Promise; + + /** + * Add or replace documents in an index. + * @param indexName - Name of the index + * @param documents - Array of documents to add + * @param primaryKey - The primary key field name + */ + addDocuments(indexName: string, documents: SearchHit[], primaryKey?: string): Promise; + + /** + * Add documents in batches for large datasets. + * @param indexName - Name of the index + * @param documents - Array of documents to add + * @param primaryKey - The primary key field name + * @param batchSize - Number of documents per batch + */ + addDocumentsInBatches( + indexName: string, + documents: SearchHit[], + primaryKey?: string, + batchSize?: number, + ): Promise; + + /** + * Update existing documents in an index. + * @param indexName - Name of the index + * @param documents - Array of documents with updates + */ + updateDocuments(indexName: string, documents: SearchHit[]): Promise; + + /** + * Delete a single document by its ID. + * @param indexName - Name of the index + * @param documentId - The document's primary key value + */ + deleteDocument(indexName: string, documentId: string): Promise; + + /** + * Delete multiple documents by their IDs. + * @param indexName - Name of the index + * @param documentIds - Array of document primary key values + */ + deleteDocuments(indexName: string, documentIds: string[]): Promise; + + /** + * Get a single document by its ID. + * @param indexName - Name of the index + * @param documentId - The document's primary key value + */ + getDocument(indexName: string, documentId: string): Promise; + + /** + * Get documents from an index with pagination. + * @param indexName - Name of the index + * @param options - Pagination options (limit, offset) + */ + getDocuments( + indexName: string, + options: { limit: number; offset: number }, + ): Promise<{ results: SearchHit[] }>; + + /** + * Search an index with a query string and optional parameters. + * @param indexName - Name of the index + * @param query - The search query string + * @param params - Optional search parameters (filters, pagination, etc.) + */ + search(indexName: string, query: string, params?: SearchParams): Promise; +} + +/** + * Configuration for search provider initialization. + */ +export interface SearchProviderConfig { + /** The provider type to use */ + provider: 'meilisearch' | 'opensearch' | 'typesense'; + /** Provider-specific connection options */ + options: Record; +} diff --git a/packages/data-schemas/src/models/plugins/search/searchProviderFactory.spec.ts b/packages/data-schemas/src/models/plugins/search/searchProviderFactory.spec.ts new file mode 100644 index 000000000000..07fff6e05eb1 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/searchProviderFactory.spec.ts @@ -0,0 +1,273 @@ +/** + * Tests for the Search Provider Factory and detection logic. + * + * These tests verify: + * - Provider auto-detection from environment variables + * - Explicit SEARCH_PROVIDER selection + * - Backward compatibility (MeiliSearch remains default) + * - Provider instantiation and caching + * - isSearchEnabled logic + */ + +import { + detectSearchProvider, + getSearchProvider, + resetSearchProvider, + isSearchEnabled, +} from './searchProviderFactory'; + +// Mock the provider modules to avoid real network calls +jest.mock('./meiliSearchProvider', () => ({ + MeiliSearchProvider: jest.fn().mockImplementation((opts: Record) => ({ + name: 'meilisearch', + host: opts.host, + healthCheck: jest.fn().mockResolvedValue(true), + })), +})); + +jest.mock('./openSearchProvider', () => ({ + OpenSearchProvider: jest.fn().mockImplementation((opts: Record) => ({ + name: 'opensearch', + node: opts.node, + healthCheck: jest.fn().mockResolvedValue(true), + })), +})); + +jest.mock('./typesenseProvider', () => ({ + TypesenseProvider: jest.fn().mockImplementation((opts: Record) => ({ + name: 'typesense', + node: opts.node, + healthCheck: jest.fn().mockResolvedValue(true), + })), +})); + +describe('Search Provider Factory', () => { + const OLD_ENV = process.env; + + beforeEach(() => { + // Reset env and cached provider before each test + process.env = { ...OLD_ENV }; + delete process.env.SEARCH_PROVIDER; + delete process.env.MEILI_HOST; + delete process.env.MEILI_MASTER_KEY; + delete process.env.OPENSEARCH_HOST; + delete process.env.OPENSEARCH_USERNAME; + delete process.env.OPENSEARCH_PASSWORD; + delete process.env.OPENSEARCH_INSECURE; + delete process.env.TYPESENSE_HOST; + delete process.env.TYPESENSE_API_KEY; + delete process.env.SEARCH; + resetSearchProvider(); + }); + + afterAll(() => { + process.env = OLD_ENV; + }); + + describe('detectSearchProvider', () => { + test('returns null when no search env vars are set', () => { + expect(detectSearchProvider()).toBeNull(); + }); + + test('returns "meilisearch" when MEILI_HOST and MEILI_MASTER_KEY are set', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('meilisearch'); + }); + + test('returns null when only MEILI_HOST is set (missing key)', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + expect(detectSearchProvider()).toBeNull(); + }); + + test('returns "opensearch" when OPENSEARCH_HOST is set', () => { + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + expect(detectSearchProvider()).toBe('opensearch'); + }); + + test('returns "typesense" when TYPESENSE_HOST and TYPESENSE_API_KEY are set', () => { + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + process.env.TYPESENSE_API_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('typesense'); + }); + + test('returns null when only TYPESENSE_HOST is set (missing key)', () => { + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + expect(detectSearchProvider()).toBeNull(); + }); + + test('OPENSEARCH_HOST takes priority over MEILI_HOST in auto-detection', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + expect(detectSearchProvider()).toBe('opensearch'); + }); + + test('OPENSEARCH_HOST takes priority over TYPESENSE in auto-detection', () => { + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + process.env.TYPESENSE_API_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('opensearch'); + }); + + test('TYPESENSE takes priority over MEILI in auto-detection', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + process.env.TYPESENSE_API_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('typesense'); + }); + + describe('explicit SEARCH_PROVIDER overrides auto-detection', () => { + test('SEARCH_PROVIDER=meilisearch overrides OpenSearch env vars', () => { + process.env.SEARCH_PROVIDER = 'meilisearch'; + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + expect(detectSearchProvider()).toBe('meilisearch'); + }); + + test('SEARCH_PROVIDER=opensearch overrides MeiliSearch env vars', () => { + process.env.SEARCH_PROVIDER = 'opensearch'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('opensearch'); + }); + + test('SEARCH_PROVIDER=typesense overrides other env vars', () => { + process.env.SEARCH_PROVIDER = 'typesense'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + expect(detectSearchProvider()).toBe('typesense'); + }); + + test('SEARCH_PROVIDER is case-insensitive', () => { + process.env.SEARCH_PROVIDER = 'OpenSearch'; + expect(detectSearchProvider()).toBe('opensearch'); + + process.env.SEARCH_PROVIDER = 'MEILISEARCH'; + expect(detectSearchProvider()).toBe('meilisearch'); + + process.env.SEARCH_PROVIDER = 'Typesense'; + expect(detectSearchProvider()).toBe('typesense'); + }); + + test('unknown SEARCH_PROVIDER falls through to auto-detection', () => { + process.env.SEARCH_PROVIDER = 'unknown_provider'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(detectSearchProvider()).toBe('meilisearch'); + }); + + test('unknown SEARCH_PROVIDER with no env vars returns null', () => { + process.env.SEARCH_PROVIDER = 'unknown_provider'; + expect(detectSearchProvider()).toBeNull(); + }); + }); + }); + + describe('getSearchProvider', () => { + test('returns null when no provider is configured', () => { + expect(getSearchProvider()).toBeNull(); + }); + + test('creates MeiliSearch provider when configured', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + + const provider = getSearchProvider(); + expect(provider).not.toBeNull(); + expect(provider!.name).toBe('meilisearch'); + }); + + test('creates OpenSearch provider when configured', () => { + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + + const provider = getSearchProvider(); + expect(provider).not.toBeNull(); + expect(provider!.name).toBe('opensearch'); + }); + + test('creates Typesense provider when configured', () => { + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + process.env.TYPESENSE_API_KEY = 'test-key'; + + const provider = getSearchProvider(); + expect(provider).not.toBeNull(); + expect(provider!.name).toBe('typesense'); + }); + + test('caches provider instance (singleton)', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + + const provider1 = getSearchProvider(); + const provider2 = getSearchProvider(); + expect(provider1).toBe(provider2); + }); + + test('resetSearchProvider clears the cached instance', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + + const provider1 = getSearchProvider(); + resetSearchProvider(); + + // Change config to OpenSearch + delete process.env.MEILI_HOST; + delete process.env.MEILI_MASTER_KEY; + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + + const provider2 = getSearchProvider(); + expect(provider1!.name).toBe('meilisearch'); + expect(provider2!.name).toBe('opensearch'); + expect(provider1).not.toBe(provider2); + }); + }); + + describe('isSearchEnabled', () => { + test('returns false when SEARCH is not set', () => { + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(isSearchEnabled()).toBe(false); + }); + + test('returns false when SEARCH is not "true"', () => { + process.env.SEARCH = 'false'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(isSearchEnabled()).toBe(false); + }); + + test('returns false when SEARCH=true but no provider configured', () => { + process.env.SEARCH = 'true'; + expect(isSearchEnabled()).toBe(false); + }); + + test('returns true when SEARCH=true and MeiliSearch configured', () => { + process.env.SEARCH = 'true'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(isSearchEnabled()).toBe(true); + }); + + test('returns true when SEARCH=true and OpenSearch configured', () => { + process.env.SEARCH = 'true'; + process.env.OPENSEARCH_HOST = 'https://localhost:9200'; + expect(isSearchEnabled()).toBe(true); + }); + + test('returns true when SEARCH=true and Typesense configured', () => { + process.env.SEARCH = 'true'; + process.env.TYPESENSE_HOST = 'http://localhost:8108'; + process.env.TYPESENSE_API_KEY = 'test-key'; + expect(isSearchEnabled()).toBe(true); + }); + + test('SEARCH is case-insensitive', () => { + process.env.SEARCH = 'TRUE'; + process.env.MEILI_HOST = 'http://localhost:7700'; + process.env.MEILI_MASTER_KEY = 'test-key'; + expect(isSearchEnabled()).toBe(true); + }); + }); +}); diff --git a/packages/data-schemas/src/models/plugins/search/searchProviderFactory.ts b/packages/data-schemas/src/models/plugins/search/searchProviderFactory.ts new file mode 100644 index 000000000000..b6d08986dbb9 --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/searchProviderFactory.ts @@ -0,0 +1,113 @@ +import type { SearchProvider } from './searchProvider'; +import { OpenSearchProvider } from './openSearchProvider'; +import { TypesenseProvider } from './typesenseProvider'; +import { MeiliSearchProvider } from './meiliSearchProvider'; +import logger from '~/config/meiliLogger'; + +/** + * Determines which search provider to use based on environment variables. + * + * Priority: + * 1. If SEARCH_PROVIDER is explicitly set, use that. + * 2. If OPENSEARCH_HOST is set, use OpenSearch. + * 3. If MEILI_HOST + MEILI_MASTER_KEY are set, use MeiliSearch (backward compatible default). + * 4. Otherwise, return null (search disabled). + * + * This ensures existing deployments using MEILI_HOST/MEILI_MASTER_KEY continue to work + * without any configuration changes. + */ +export function detectSearchProvider(): 'meilisearch' | 'opensearch' | 'typesense' | null { + const explicit = process.env.SEARCH_PROVIDER?.toLowerCase(); + + if (explicit === 'opensearch') { + return 'opensearch'; + } + if (explicit === 'meilisearch') { + return 'meilisearch'; + } + if (explicit === 'typesense') { + return 'typesense'; + } + + // Auto-detect based on available env vars + if (process.env.OPENSEARCH_HOST) { + return 'opensearch'; + } + if (process.env.TYPESENSE_HOST && process.env.TYPESENSE_API_KEY) { + return 'typesense'; + } + if (process.env.MEILI_HOST && process.env.MEILI_MASTER_KEY) { + return 'meilisearch'; + } + + return null; +} + +/** + * Singleton search provider instance. + */ +let providerInstance: SearchProvider | null = null; + +/** + * Creates and returns a SearchProvider based on environment configuration. + * Returns null if search is not configured. + * + * Uses lazy initialization — the provider is created on first call and cached. + */ +export function getSearchProvider(): SearchProvider | null { + if (providerInstance) { + return providerInstance; + } + + const providerType = detectSearchProvider(); + if (!providerType) { + return null; + } + + try { + if (providerType === 'opensearch') { + providerInstance = new OpenSearchProvider({ + node: process.env.OPENSEARCH_HOST as string, + username: process.env.OPENSEARCH_USERNAME, + password: process.env.OPENSEARCH_PASSWORD, + insecure: process.env.OPENSEARCH_INSECURE === 'true', + }); + logger.info('[SearchProviderFactory] Using OpenSearch provider'); + } else if (providerType === 'typesense') { + providerInstance = new TypesenseProvider({ + node: process.env.TYPESENSE_HOST as string, + apiKey: process.env.TYPESENSE_API_KEY as string, + }); + logger.info('[SearchProviderFactory] Using Typesense provider'); + } else { + providerInstance = new MeiliSearchProvider({ + host: process.env.MEILI_HOST as string, + apiKey: process.env.MEILI_MASTER_KEY as string, + }); + logger.info('[SearchProviderFactory] Using MeiliSearch provider'); + } + } catch (error) { + logger.error(`[SearchProviderFactory] Failed to create ${providerType} provider:`, error); + return null; + } + + return providerInstance; +} + +/** + * Reset the cached provider instance. Useful for testing. + */ +export function resetSearchProvider(): void { + providerInstance = null; +} + +/** + * Check if search is enabled based on environment variables. + */ +export function isSearchEnabled(): boolean { + const searchFlag = process.env.SEARCH; + if (!searchFlag || searchFlag.toLowerCase() !== 'true') { + return false; + } + return detectSearchProvider() !== null; +} diff --git a/packages/data-schemas/src/models/plugins/search/typesenseProvider.spec.ts b/packages/data-schemas/src/models/plugins/search/typesenseProvider.spec.ts new file mode 100644 index 000000000000..97d484604f5c --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/typesenseProvider.spec.ts @@ -0,0 +1,598 @@ +/** + * Tests for the Typesense Provider. + * + * These tests verify: + * - Constructor validation + * - HTTP request building (API key header, timeouts) + * - Query/filter translation from MeiliSearch-style to Typesense syntax + * - All SearchProvider interface methods + */ + +import { TypesenseProvider } from './typesenseProvider'; + +// Mock fetch globally +const mockFetch = jest.fn(); +global.fetch = mockFetch as unknown as typeof fetch; + +// Mock logger +jest.mock('~/config/meiliLogger', () => ({ + __esModule: true, + default: { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + }, +})); + +describe('TypesenseProvider', () => { + let provider: TypesenseProvider; + + beforeEach(() => { + mockFetch.mockReset(); + provider = new TypesenseProvider({ + node: 'http://localhost:8108', + apiKey: 'test-api-key', + }); + }); + + describe('constructor', () => { + test('throws when node URL is missing', () => { + expect(() => new TypesenseProvider({ node: '', apiKey: 'key' })).toThrow( + 'Typesense provider requires a node URL', + ); + }); + + test('throws when API key is missing', () => { + expect(() => new TypesenseProvider({ node: 'http://localhost:8108', apiKey: '' })).toThrow( + 'Typesense provider requires an API key', + ); + }); + + test('strips trailing slashes from node URL', () => { + const p = new TypesenseProvider({ node: 'http://localhost:8108///', apiKey: 'key' }); + expect((p as unknown as Record)['node']).toBe('http://localhost:8108'); + }); + }); + + describe('healthCheck', () => { + test('returns true when health endpoint returns 200', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{"ok":true}'), + }); + + expect(await provider.healthCheck()).toBe(true); + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:8108/health', + expect.objectContaining({ + method: 'GET', + headers: expect.objectContaining({ + 'X-TYPESENSE-API-KEY': 'test-api-key', + }), + }), + ); + }); + + test('returns false on non-200 status', async () => { + mockFetch.mockResolvedValueOnce({ + status: 503, + text: () => Promise.resolve('Service Unavailable'), + }); + + expect(await provider.healthCheck()).toBe(false); + }); + + test('returns false on network error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Connection refused')); + + expect(await provider.healthCheck()).toBe(false); + }); + }); + + describe('createIndex', () => { + test('skips creation when collection already exists', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ name: 'messages' })), + }); + + await provider.createIndex('messages', 'messageId'); + + expect(mockFetch).toHaveBeenCalledTimes(1); + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:8108/collections/messages', + expect.objectContaining({ method: 'GET' }), + ); + }); + + test('creates messages collection with correct schema fields', async () => { + mockFetch + .mockResolvedValueOnce({ + status: 404, + text: () => Promise.resolve('{"message":"Not Found"}'), + }) + .mockResolvedValueOnce({ + status: 201, + text: () => Promise.resolve(JSON.stringify({ name: 'messages' })), + }); + + await provider.createIndex('messages', 'messageId'); + + expect(mockFetch).toHaveBeenCalledTimes(2); + const createCall = mockFetch.mock.calls[1]; + expect(createCall[0]).toBe('http://localhost:8108/collections'); + expect(createCall[1].method).toBe('POST'); + + const body = JSON.parse(createCall[1].body); + expect(body.name).toBe('messages'); + expect(body.fields).toEqual( + expect.arrayContaining([ + expect.objectContaining({ name: 'messageId', type: 'string' }), + expect.objectContaining({ name: 'conversationId', type: 'string' }), + expect.objectContaining({ name: 'user', type: 'string', facet: true }), + expect.objectContaining({ name: 'sender', type: 'string' }), + expect.objectContaining({ name: 'text', type: 'string' }), + expect.objectContaining({ name: '.*', type: 'auto' }), + ]), + ); + }); + + test('creates convos collection with title, user, tags fields', async () => { + mockFetch + .mockResolvedValueOnce({ + status: 404, + text: () => Promise.resolve('{"message":"Not Found"}'), + }) + .mockResolvedValueOnce({ + status: 201, + text: () => Promise.resolve(JSON.stringify({ name: 'convos' })), + }); + + await provider.createIndex('convos', 'conversationId'); + + const createCall = mockFetch.mock.calls[1]; + const body = JSON.parse(createCall[1].body); + expect(body.name).toBe('convos'); + expect(body.fields).toEqual( + expect.arrayContaining([ + expect.objectContaining({ name: 'conversationId', type: 'string' }), + expect.objectContaining({ name: 'title', type: 'string' }), + expect.objectContaining({ name: 'user', type: 'string', facet: true }), + expect.objectContaining({ name: 'tags', type: 'string[]' }), + expect.objectContaining({ name: '.*', type: 'auto' }), + ]), + ); + }); + }); + + describe('addDocuments', () => { + test('does nothing for empty array', async () => { + await provider.addDocuments('messages', []); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + test('sends import request with JSONL body and maps primaryKey to id', async () => { + // First register the index so primaryKey mapping is active + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ name: 'messages' })), + }); + await provider.createIndex('messages', 'messageId'); + mockFetch.mockClear(); + + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve('{"success":true}\n{"success":true}'), + }); + + await provider.addDocuments( + 'messages', + [ + { messageId: 'msg1', text: 'hello', user: 'user1' }, + { messageId: 'msg2', text: 'world', user: 'user2' }, + ], + 'messageId', + ); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const call = mockFetch.mock.calls[0]; + expect(call[0]).toContain('/collections/messages/documents/import?action=upsert'); + expect(call[1].headers['Content-Type']).toBe('text/plain'); + + const lines = (call[1].body as string).split('\n'); + expect(lines).toHaveLength(2); + const doc1 = JSON.parse(lines[0]); + expect(doc1.id).toBe('msg1'); + expect(doc1.messageId).toBe('msg1'); + expect(doc1.text).toBe('hello'); + }); + + test('strips null/undefined values and stringifies objects', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ name: 'convos' })), + }); + await provider.createIndex('convos', 'conversationId'); + mockFetch.mockClear(); + + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve('{"success":true}'), + }); + + await provider.addDocuments( + 'convos', + [ + { + conversationId: 'conv1', + title: 'Test', + user: 'user1', + nullField: null, + nestedObj: { foo: 'bar' }, + }, + ], + 'conversationId', + ); + + const call = mockFetch.mock.calls[0]; + const doc = JSON.parse((call[1].body as string).split('\n')[0]); + expect(doc.id).toBe('conv1'); + expect(doc.nullField).toBeUndefined(); + expect(doc.nestedObj).toBe('{"foo":"bar"}'); + }); + }); + + describe('addDocumentsInBatches', () => { + test('splits documents into batches', async () => { + mockFetch + .mockResolvedValueOnce({ + text: () => Promise.resolve('{"success":true}\n{"success":true}\n{"success":true}'), + }) + .mockResolvedValueOnce({ + text: () => Promise.resolve('{"success":true}\n{"success":true}'), + }); + + const docs = Array.from({ length: 5 }, (_, i) => ({ + id: `doc${i}`, + text: `text${i}`, + })); + + await provider.addDocumentsInBatches('test', docs, 'id', 3); + + expect(mockFetch).toHaveBeenCalledTimes(2); + }); + }); + + describe('deleteDocument', () => { + test('sends DELETE request with encoded document ID', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{}'), + }); + + await provider.deleteDocument('messages', 'msg/1'); + + expect(mockFetch).toHaveBeenCalledWith( + 'http://localhost:8108/collections/messages/documents/msg%2F1', + expect.objectContaining({ method: 'DELETE' }), + ); + }); + }); + + describe('deleteDocuments', () => { + test('does nothing for empty array', async () => { + await provider.deleteDocuments('messages', []); + expect(mockFetch).not.toHaveBeenCalled(); + }); + + test('sends batch delete with filter_by', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{"num_deleted":2}'), + }); + + await provider.deleteDocuments('messages', ['msg1', 'msg2']); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toContain('/collections/messages/documents?filter_by='); + expect(call[1].method).toBe('DELETE'); + }); + }); + + describe('getDocument', () => { + test('returns document on success', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve(JSON.stringify({ id: 'msg1', text: 'hello', user: 'user1' })), + }); + + const doc = await provider.getDocument('messages', 'msg1'); + expect(doc).toEqual({ id: 'msg1', text: 'hello', user: 'user1' }); + }); + + test('returns null when document not found', async () => { + mockFetch.mockResolvedValueOnce({ + status: 404, + text: () => Promise.resolve('{"message":"Not Found"}'), + }); + + const doc = await provider.getDocument('messages', 'nonexistent'); + expect(doc).toBeNull(); + }); + + test('returns null on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const doc = await provider.getDocument('messages', 'msg1'); + expect(doc).toBeNull(); + }); + }); + + describe('search', () => { + beforeEach(async () => { + // Register the messages index so query_by fields are available + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ name: 'messages' })), + }); + await provider.createIndex('messages', 'messageId'); + mockFetch.mockClear(); + }); + + test('performs wildcard search with correct query_by fields', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 0, + hits: [], + page: 1, + }), + ), + }); + + await provider.search('messages', ''); + + const call = mockFetch.mock.calls[0]; + const url = decodeURIComponent(call[0]); + expect(url).toContain('q=*'); + // query_by should use actual field names, not '*' + expect(url).toContain('query_by='); + expect(url).not.toContain('query_by=*'); + // Should include searchable fields (excluding primaryKey messageId) + expect(url).toMatch(/query_by=.*text/); + }); + + test('performs search with query', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 1, + hits: [{ document: { id: 'msg1', text: 'hello world' } }], + page: 1, + }), + ), + }); + + const result = await provider.search('messages', 'hello'); + + expect(result.hits).toHaveLength(1); + expect(result.hits[0]).toEqual({ id: 'msg1', text: 'hello world' }); + expect(result.totalHits).toBe(1); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toContain('q=hello'); + }); + + test('translates MeiliSearch-style filter to Typesense filter_by', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 0, + hits: [], + page: 1, + }), + ), + }); + + await provider.search('messages', 'hello', { + filter: 'user = "user123"', + }); + + const call = mockFetch.mock.calls[0]; + expect(decodeURIComponent(call[0])).toContain('filter_by=user:=user123'); + }); + + test('handles AND filters', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 0, + hits: [], + page: 1, + }), + ), + }); + + await provider.search('messages', 'hello', { + filter: 'user = "user123" AND status = "active"', + }); + + const call = mockFetch.mock.calls[0]; + const url = decodeURIComponent(call[0]); + expect(url).toContain('filter_by=user:=user123 && status:=active'); + }); + + test('applies limit and offset via per_page and page', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 0, + hits: [], + page: 3, + }), + ), + }); + + await provider.search('messages', 'hello', { limit: 10, offset: 20 }); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toContain('per_page=10'); + expect(call[0]).toContain('page=3'); // offset 20 / limit 10 + 1 = 3 + }); + + test('applies sort parameters', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 0, + hits: [], + page: 1, + }), + ), + }); + + await provider.search('messages', 'hello', { + sort: ['createdAt:desc', 'title:asc'], + }); + + const call = mockFetch.mock.calls[0]; + const url = decodeURIComponent(call[0]); + expect(url).toContain('sort_by=createdAt:desc,title:asc'); + }); + + test('returns empty result on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const result = await provider.search('messages', 'hello'); + expect(result.hits).toEqual([]); + expect(result.totalHits).toBe(0); + }); + }); + + describe('getIndexSettings', () => { + test('extracts filterable attributes from faceted fields', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + name: 'messages', + fields: [ + { name: 'user', type: 'string', facet: true }, + { name: 'messageId', type: 'string', facet: false }, + { name: 'text', type: 'string' }, + ], + }), + ), + }); + + const settings = await provider.getIndexSettings('messages'); + expect(settings.filterableAttributes).toContain('user'); + expect(settings.filterableAttributes).not.toContain('messageId'); + expect(settings.filterableAttributes).not.toContain('text'); + }); + + test('returns empty settings on error', async () => { + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + const settings = await provider.getIndexSettings('messages'); + expect(settings).toEqual({}); + }); + }); + + describe('updateIndexSettings', () => { + test('patches collection schema for filterable attributes', async () => { + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve('{}'), + }); + + await provider.updateIndexSettings('messages', { + filterableAttributes: ['user', 'status'], + }); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toBe('http://localhost:8108/collections/messages'); + expect(call[1].method).toBe('PATCH'); + + const body = JSON.parse(call[1].body); + expect(body.fields).toEqual( + expect.arrayContaining([ + expect.objectContaining({ name: 'user', type: 'string', facet: true }), + expect.objectContaining({ name: 'status', type: 'string', facet: true }), + ]), + ); + }); + + test('does nothing when no filterable attributes', async () => { + await provider.updateIndexSettings('messages', {}); + expect(mockFetch).not.toHaveBeenCalled(); + }); + }); + + describe('updateDocuments', () => { + test('uses upsert action for updates', async () => { + mockFetch.mockResolvedValueOnce({ + ok: true, + text: () => Promise.resolve('{"success":true}'), + }); + + await provider.updateDocuments('messages', [{ id: 'msg1', text: 'updated' }]); + + const call = mockFetch.mock.calls[0]; + expect(call[0]).toContain('action=upsert'); + }); + + test('does nothing for empty array', async () => { + await provider.updateDocuments('messages', []); + expect(mockFetch).not.toHaveBeenCalled(); + }); + }); + + describe('getDocuments', () => { + test('uses valid query_by fields for listing', async () => { + // Register index first + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ name: 'convos' })), + }); + await provider.createIndex('convos', 'conversationId'); + mockFetch.mockClear(); + + mockFetch.mockResolvedValueOnce({ + status: 200, + text: () => + Promise.resolve( + JSON.stringify({ + found: 1, + hits: [{ document: { id: 'conv1', title: 'Test', user: 'user1' } }], + page: 1, + }), + ), + }); + + const result = await provider.getDocuments('convos', { limit: 10, offset: 0 }); + + expect(result.results).toHaveLength(1); + const call = mockFetch.mock.calls[0]; + const url = decodeURIComponent(call[0]); + // Should use actual field names, not empty or '*' + expect(url).toContain('query_by=title,user'); + expect(url).toContain('q=*'); + }); + }); +}); diff --git a/packages/data-schemas/src/models/plugins/search/typesenseProvider.ts b/packages/data-schemas/src/models/plugins/search/typesenseProvider.ts new file mode 100644 index 000000000000..f4a41c9d765d --- /dev/null +++ b/packages/data-schemas/src/models/plugins/search/typesenseProvider.ts @@ -0,0 +1,555 @@ +import type { + SearchProvider, + SearchHit, + SearchResult, + SearchParams, + IndexSettings, +} from './searchProvider'; +import logger from '~/config/meiliLogger'; + +/** + * Typesense Search Provider + * + * Uses the built-in fetch API (Node 18+) to communicate with Typesense's REST API. + * Typesense requires explicit collection schemas with typed fields, unlike MeiliSearch + * and OpenSearch which are more schema-flexible. + * + * Typesense API reference: https://typesense.org/api + * + * Key differences from MeiliSearch/OpenSearch: + * - Collections require a schema definition with field names and types + * - Search uses query parameters rather than JSON body + * - Authentication via X-TYPESENSE-API-KEY header + * - Default port is 8108 + * - Documents are accessed via /collections/{name}/documents + */ + +export interface TypesenseProviderOptions { + /** Typesense node URL, e.g. http://localhost:8108 */ + node: string; + /** API key for authentication */ + apiKey: string; + /** Connection timeout in milliseconds (default: 5000) */ + connectionTimeoutMs?: number; +} + +interface TypesenseField { + name: string; + type: string; + facet?: boolean; + optional?: boolean; + index?: boolean; +} + +interface TypesenseCollectionSchema { + name: string; + fields: TypesenseField[]; + default_sorting_field?: string; +} + +interface TypesenseSearchResponse { + found: number; + hits: Array<{ + document: SearchHit; + highlights?: unknown[]; + }>; + page: number; + request_params?: unknown; +} + +export class TypesenseProvider implements SearchProvider { + readonly name = 'typesense'; + private node: string; + private apiKey: string; + private connectionTimeoutMs: number; + /** Maps index name → primary key field name (e.g. 'convos' → 'conversationId') */ + private primaryKeyMap: Map = new Map(); + /** Maps index name → list of string field names for query_by */ + private searchableFieldsMap: Map = new Map(); + + constructor(options: TypesenseProviderOptions) { + if (!options.node) { + throw new Error('Typesense provider requires a node URL'); + } + if (!options.apiKey) { + throw new Error('Typesense provider requires an API key'); + } + this.node = options.node.replace(/\/+$/, ''); + this.apiKey = options.apiKey; + this.connectionTimeoutMs = options.connectionTimeoutMs ?? 5000; + } + + // ------------------------------------------------------------------ // + // Internal HTTP helpers // + // ------------------------------------------------------------------ // + + private async request( + method: string, + path: string, + body?: unknown, + ): Promise<{ status: number; data: Record }> { + const url = `${this.node}${path}`; + const headers: Record = { + 'Content-Type': 'application/json', + 'X-TYPESENSE-API-KEY': this.apiKey, + }; + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), this.connectionTimeoutMs); + + try { + const response = await fetch(url, { + method, + headers, + body: body !== undefined ? JSON.stringify(body) : undefined, + signal: controller.signal, + }); + + const text = await response.text(); + let data: Record = {}; + try { + data = JSON.parse(text) as Record; + } catch { + data = { raw: text }; + } + return { status: response.status, data }; + } finally { + clearTimeout(timeout); + } + } + + /** + * Prepare a document for Typesense by mapping the primary key to the `id` field. + * Typesense requires every document to have a string `id` field. + */ + private prepareDocument(collectionName: string, doc: SearchHit): SearchHit { + const primaryKey = this.primaryKeyMap.get(collectionName); + if (!primaryKey) { + return doc; + } + + const prepared: SearchHit = { ...doc }; + + // Map primaryKey value → Typesense `id` field + if (prepared[primaryKey] !== undefined && primaryKey !== 'id') { + prepared['id'] = String(prepared[primaryKey]); + } + + // Ensure all values are Typesense-compatible (strings, numbers, booleans, arrays) + for (const [key, value] of Object.entries(prepared)) { + if (value === null || value === undefined) { + delete prepared[key]; + } else if (typeof value === 'object' && !Array.isArray(value)) { + // Typesense doesn't support nested objects — stringify them + prepared[key] = JSON.stringify(value); + } + } + + return prepared; + } + + /** + * Import documents using Typesense's JSONL import endpoint. + * This is more efficient than individual document creation. + */ + private async importDocuments( + collectionName: string, + documents: SearchHit[], + action: 'create' | 'upsert' | 'update' = 'upsert', + ): Promise { + const url = `${this.node}/collections/${encodeURIComponent(collectionName)}/documents/import?action=${action}`; + const headers: Record = { + 'Content-Type': 'text/plain', + 'X-TYPESENSE-API-KEY': this.apiKey, + }; + + const preparedDocs = documents.map((doc) => this.prepareDocument(collectionName, doc)); + const bodyStr = preparedDocs.map((doc) => JSON.stringify(doc)).join('\n'); + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), this.connectionTimeoutMs * 2); + + try { + const response = await fetch(url, { + method: 'POST', + headers, + body: bodyStr, + signal: controller.signal, + }); + + const text = await response.text(); + + if (!response.ok) { + logger.error( + `[TypesenseProvider] Import to ${collectionName} failed with HTTP ${response.status}: ${text}`, + ); + return; + } + + // Response is JSONL — one result per line + const lines = text.trim().split('\n'); + let errorCount = 0; + const errors: string[] = []; + for (const line of lines) { + try { + const result = JSON.parse(line) as { success: boolean; error?: string; document?: string }; + if (!result.success) { + errorCount++; + if (result.error && errors.length < 3) { + errors.push(result.error); + } + } + } catch { + // Skip unparseable lines + } + } + if (errorCount > 0) { + logger.error( + `[TypesenseProvider] Import to ${collectionName}: ${errorCount}/${documents.length} failures. Sample errors: ${errors.join('; ')}`, + ); + } + } finally { + clearTimeout(timeout); + } + } + + // ------------------------------------------------------------------ // + // SearchProvider implementation // + // ------------------------------------------------------------------ // + + async healthCheck(): Promise { + try { + const { status } = await this.request('GET', '/health'); + return status === 200; + } catch (error) { + logger.debug('[TypesenseProvider] Health check failed:', error); + return false; + } + } + + async createIndex(indexName: string, primaryKey: string): Promise { + // Track the primary key for this index so we can map it to `id` later + this.primaryKeyMap.set(indexName, primaryKey); + + // Define searchable string fields based on the index type + // These are the fields that have meiliIndex: true in the mongoose schemas + let stringFields: string[]; + if (indexName === 'messages') { + stringFields = ['messageId', 'conversationId', 'user', 'sender', 'text']; + } else { + // convos + stringFields = ['conversationId', 'title', 'user']; + } + this.searchableFieldsMap.set(indexName, stringFields); + + try { + // Check if collection exists + const { status } = await this.request( + 'GET', + `/collections/${encodeURIComponent(indexName)}`, + ); + if (status === 200) { + logger.debug(`[TypesenseProvider] Collection ${indexName} already exists`); + return; + } + } catch { + // Collection doesn't exist, create it + } + + try { + // Typesense requires explicit schema with typed fields. + // The `id` field is Typesense's built-in primary key (always string). + // We define explicit string fields for searchable content, plus a + // catch-all `.*` auto field for any additional dynamic attributes. + const fields: TypesenseField[] = []; + + // Add explicit string fields (excluding 'id' which is built-in) + for (const field of stringFields) { + if (field !== 'id') { + fields.push({ + name: field, + type: 'string', + optional: true, + ...(field === 'user' ? { facet: true } : {}), + }); + } + } + + // Add tags field for convos + if (indexName === 'convos') { + fields.push({ name: 'tags', type: 'string[]', optional: true }); + } + + // Catch-all field for any other dynamic attributes + fields.push({ name: '.*', type: 'auto' }); + + const schema: TypesenseCollectionSchema = { + name: indexName, + fields, + }; + + const { status, data } = await this.request('POST', '/collections', schema); + if (status >= 200 && status < 300) { + logger.info(`[TypesenseProvider] Created collection: ${indexName}`); + } else { + logger.error( + `[TypesenseProvider] Failed to create collection ${indexName}: HTTP ${status} - ${JSON.stringify(data)}`, + ); + } + } catch (error) { + logger.error(`[TypesenseProvider] Error creating collection ${indexName}:`, error); + } + } + + async updateIndexSettings(indexName: string, settings: IndexSettings): Promise { + try { + // Typesense doesn't have a direct "update settings" API like MeiliSearch. + // Filterable attributes in Typesense are handled via field faceting in the schema. + // We update the collection schema to add facet: true for filterable fields. + if (settings.filterableAttributes && settings.filterableAttributes.length > 0) { + const fields: TypesenseField[] = settings.filterableAttributes.map((attr) => ({ + name: attr, + type: 'string', + facet: true, + optional: true, + })); + + await this.request( + 'PATCH', + `/collections/${encodeURIComponent(indexName)}`, + { fields }, + ); + logger.debug(`[TypesenseProvider] Updated schema for ${indexName}`); + } + } catch (error) { + logger.error( + `[TypesenseProvider] Error updating collection settings for ${indexName}:`, + error, + ); + } + } + + async getIndexSettings(indexName: string): Promise { + try { + const { data } = await this.request( + 'GET', + `/collections/${encodeURIComponent(indexName)}`, + ); + const fields = data.fields as TypesenseField[] | undefined; + + const filterableAttributes: string[] = []; + if (fields) { + for (const field of fields) { + if (field.facet) { + filterableAttributes.push(field.name); + } + } + } + return { filterableAttributes }; + } catch (error) { + logger.error( + `[TypesenseProvider] Error getting collection settings for ${indexName}:`, + error, + ); + return {}; + } + } + + async addDocuments( + indexName: string, + documents: SearchHit[], + _primaryKey?: string, + ): Promise { + if (documents.length === 0) { + return; + } + await this.importDocuments(indexName, documents, 'upsert'); + } + + async addDocumentsInBatches( + indexName: string, + documents: SearchHit[], + primaryKey?: string, + batchSize: number = 100, + ): Promise { + for (let i = 0; i < documents.length; i += batchSize) { + const batch = documents.slice(i, i + batchSize); + await this.addDocuments(indexName, batch, primaryKey); + } + } + + async updateDocuments(indexName: string, documents: SearchHit[]): Promise { + if (documents.length === 0) { + return; + } + await this.importDocuments(indexName, documents, 'upsert'); + } + + async deleteDocument(indexName: string, documentId: string): Promise { + try { + await this.request( + 'DELETE', + `/collections/${encodeURIComponent(indexName)}/documents/${encodeURIComponent(documentId)}`, + ); + } catch (error) { + logger.error( + `[TypesenseProvider] Error deleting document ${documentId} from ${indexName}:`, + error, + ); + } + } + + async deleteDocuments(indexName: string, documentIds: string[]): Promise { + if (documentIds.length === 0) { + return; + } + // Typesense supports batch delete via filter_by + // For ID-based deletion, we use individual deletes or filter + const filterBy = `id: [${documentIds.map((id) => JSON.stringify(id)).join(',')}]`; + try { + await this.request( + 'DELETE', + `/collections/${encodeURIComponent(indexName)}/documents?filter_by=${encodeURIComponent(filterBy)}`, + ); + } catch { + // Fallback: delete individually + for (const id of documentIds) { + await this.deleteDocument(indexName, id); + } + } + } + + async getDocument(indexName: string, documentId: string): Promise { + try { + // In Typesense, documents are retrieved by their `id` field. + // We mapped primaryKey → id during import, so use documentId directly. + const { status, data } = await this.request( + 'GET', + `/collections/${encodeURIComponent(indexName)}/documents/${encodeURIComponent(documentId)}`, + ); + if (status === 200) { + return data as SearchHit; + } + return null; + } catch { + return null; + } + } + + /** + * Get the query_by fields for a given index. + * Typesense requires explicit field names — wildcard `*` is not supported for query_by. + */ + private getQueryByFields(indexName: string): string { + const fields = this.searchableFieldsMap.get(indexName); + if (fields && fields.length > 0) { + // Filter out the primary key from query_by — it's the `id` field in Typesense + const primaryKey = this.primaryKeyMap.get(indexName); + const queryFields = fields.filter((f) => f !== primaryKey && f !== 'id'); + return queryFields.length > 0 ? queryFields.join(',') : fields[0]; + } + // Fallback: use title for convos, text for messages + if (indexName === 'messages') { + return 'text,sender'; + } + return 'title,user'; + } + + async getDocuments( + indexName: string, + options: { limit: number; offset: number }, + ): Promise<{ results: SearchHit[] }> { + try { + // Typesense doesn't have a direct "list documents" API with offset. + // We use the export endpoint for listing, or search with match-all query. + const page = Math.floor(options.offset / options.limit) + 1; + const queryBy = this.getQueryByFields(indexName); + const { data } = await this.request( + 'GET', + `/collections/${encodeURIComponent(indexName)}/documents/search?q=*&query_by=${encodeURIComponent(queryBy)}&per_page=${options.limit}&page=${page}`, + ); + + const typesenseData = data as unknown as TypesenseSearchResponse; + const results: SearchHit[] = (typesenseData.hits || []).map((hit) => hit.document); + return { results }; + } catch (error) { + logger.error(`[TypesenseProvider] Error getting documents from ${indexName}:`, error); + return { results: [] }; + } + } + + async search(indexName: string, query: string, params?: SearchParams): Promise { + try { + const searchQuery = query || '*'; + const perPage = params?.limit ?? 20; + const page = params?.offset !== undefined ? Math.floor(params.offset / perPage) + 1 : 1; + const queryBy = this.getQueryByFields(indexName); + + let searchUrl = + `/collections/${encodeURIComponent(indexName)}/documents/search` + + `?q=${encodeURIComponent(searchQuery)}` + + `&query_by=${encodeURIComponent(queryBy)}` + + `&per_page=${perPage}` + + `&page=${page}`; + + // Translate MeiliSearch-style filter to Typesense filter_by + if (params?.filter) { + const typesenseFilter = this.translateFilter(params.filter); + if (typesenseFilter) { + searchUrl += `&filter_by=${encodeURIComponent(typesenseFilter)}`; + } + } + + if (params?.sort) { + const sortBy = params.sort + .map((s) => { + const match = s.match(/^(.+?):(asc|desc)$/i); + const field = match ? match[1] : s; + const order = match ? match[2].toLowerCase() : 'asc'; + return `${field}:${order}`; + }) + .join(','); + searchUrl += `&sort_by=${encodeURIComponent(sortBy)}`; + } + + const { data } = await this.request('GET', searchUrl); + const typesenseData = data as unknown as TypesenseSearchResponse; + + const hits: SearchHit[] = (typesenseData.hits || []).map((hit) => hit.document); + + return { + hits, + totalHits: typesenseData.found || 0, + offset: params?.offset, + limit: params?.limit, + }; + } catch (error) { + logger.error(`[TypesenseProvider] Error searching ${indexName}:`, error); + return { hits: [], totalHits: 0 }; + } + } + + // ------------------------------------------------------------------ // + // Filter translation helpers // + // ------------------------------------------------------------------ // + + /** + * Translate a MeiliSearch-style filter string to Typesense filter_by syntax. + * MeiliSearch: `user = "userId"` or `user = 'userId'` + * Typesense: `user:=userId` + */ + private translateFilter(filter: string): string { + const parts = filter.split(/\s+AND\s+/i); + const typesenseParts: string[] = []; + + for (const part of parts) { + const match = part.trim().match(/^(\w+)\s*=\s*(["'])(.*)\2$/); + if (match) { + const [, field, , value] = match; + typesenseParts.push(`${field}:=${value}`); + } + } + + return typesenseParts.join(' && '); + } +}