diff --git a/backend/shared/model/north-connector.model.ts b/backend/shared/model/north-connector.model.ts index b732cb4e6f..9011215d3c 100644 --- a/backend/shared/model/north-connector.model.ts +++ b/backend/shared/model/north-connector.model.ts @@ -13,6 +13,7 @@ export const OIBUS_NORTH_TYPES = [ 'file-writer', 'metroscope-lithium', 'oianalytics', + 'postgresql', 'sftp', 'rest' ] as const; diff --git a/backend/shared/model/north-settings.model.ts b/backend/shared/model/north-settings.model.ts index 786094f673..a9b3c05d9b 100644 --- a/backend/shared/model/north-settings.model.ts +++ b/backend/shared/model/north-settings.model.ts @@ -37,6 +37,13 @@ export interface NorthRESTSettingsQueryParams { value: string; } +export interface NorthPostgreSQLSettingsCustomIndexes { + name: string; + column: string; + unique: boolean; + order: string; +} + export interface NorthAmazonS3Settings { bucket: string; region: string; @@ -94,6 +101,19 @@ export interface NorthOIAnalyticsSettings { specificSettings?: NorthOIAnalyticsSettingsSpecificSettings | null; } +export interface NorthPostgreSQLSettings { + host: string; + port: number; + database: string; + username: string; + password: string | null; + table: string; + createTableIfNotExists: boolean; + batchSize: number; + connectionTimeout: number; + customIndexes: Array | null; +} + export interface NorthRESTSettings { endpoint: string; testPath: string; @@ -129,5 +149,6 @@ export type NorthSettings = | NorthFileWriterSettings | NorthMetroscopeLithiumSettings | NorthOIAnalyticsSettings + | NorthPostgreSQLSettings | NorthRESTSettings | NorthSFTPSettings; diff --git a/backend/src/north/north-postgresql/manifest.ts b/backend/src/north/north-postgresql/manifest.ts new file mode 100644 index 0000000000..aceae2b510 --- /dev/null +++ b/backend/src/north/north-postgresql/manifest.ts @@ -0,0 +1,146 @@ +import { NorthConnectorManifest } from '../../../shared/model/north-connector.model'; + +const manifest: NorthConnectorManifest = { + id: 'postgresql', + category: 'api', + modes: { + files: false, + points: true + }, + settings: [ + { + key: 'host', + type: 'OibText', + translationKey: 'north.postgresql.host', + newRow: true, + defaultValue: 'localhost', + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-6' + }, + { + key: 'port', + type: 'OibNumber', + translationKey: 'north.postgresql.port', + defaultValue: 5432, + validators: [{ key: 'required' }, { key: 'min', params: { min: 1 } }, { key: 'max', params: { max: 65535 } }], + displayInViewMode: true, + class: 'col-3' + }, + { + key: 'database', + type: 'OibText', + translationKey: 'north.postgresql.database', + newRow: true, + defaultValue: '', + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-6' + }, + { + key: 'username', + type: 'OibText', + translationKey: 'north.postgresql.username', + defaultValue: '', + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-3' + }, + { + key: 'password', + type: 'OibSecret', + translationKey: 'north.postgresql.password', + defaultValue: '', + class: 'col-3' + }, + { + key: 'table', + type: 'OibText', + translationKey: 'north.postgresql.table', + newRow: true, + defaultValue: 'oibus_data', + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-6' + }, + { + key: 'createTableIfNotExists', + type: 'OibCheckbox', + translationKey: 'north.postgresql.create-table-if-not-exists', + defaultValue: true, + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-3' + }, + { + key: 'batchSize', + type: 'OibNumber', + translationKey: 'north.postgresql.batch-size', + defaultValue: 1000, + validators: [{ key: 'required' }, { key: 'min', params: { min: 1 } }, { key: 'max', params: { max: 10000 } }], + displayInViewMode: true, + class: 'col-3' + }, + { + key: 'connectionTimeout', + type: 'OibNumber', + translationKey: 'north.postgresql.connection-timeout', + newRow: true, + defaultValue: 30, + unitLabel: 's', + validators: [{ key: 'required' }, { key: 'min', params: { min: 1 } }], + displayInViewMode: true, + class: 'col-3' + }, + { + key: 'customIndexes', + type: 'OibArray', + translationKey: 'north.postgresql.custom-indexes.custom-index', + content: [ + { + key: 'name', + translationKey: 'north.postgresql.custom-indexes.name', + type: 'OibText', + defaultValue: '', + validators: [{ key: 'required' }], + displayInViewMode: true, + class: 'col-4' + }, + { + key: 'column', + translationKey: 'north.postgresql.custom-indexes.column', + type: 'OibSelect', + defaultValue: 'timestamp', + validators: [{ key: 'required' }], + displayInViewMode: true, + options: ['timestamp', 'point_id', 'data_value', 'data_quality', 'digital_value', 'created_at'], + class: 'col-4' + }, + { + key: 'unique', + translationKey: 'north.postgresql.custom-indexes.unique', + type: 'OibCheckbox', + defaultValue: false, + displayInViewMode: true, + validators: [{ key: 'required' }], + class: 'col-2' + }, + { + key: 'order', + translationKey: 'north.postgresql.custom-indexes.order', + type: 'OibSelect', + defaultValue: 'ASC', + validators: [{ key: 'required' }], + displayInViewMode: true, + options: ['ASC', 'DESC'], + class: 'col-2' + } + ], + class: 'col', + newRow: true, + displayInViewMode: false + } + ] +}; + +export default manifest; diff --git a/backend/src/north/north-postgresql/north-postgresql.spec.ts b/backend/src/north/north-postgresql/north-postgresql.spec.ts new file mode 100644 index 0000000000..5dee39ca6a --- /dev/null +++ b/backend/src/north/north-postgresql/north-postgresql.spec.ts @@ -0,0 +1,511 @@ +import fs from 'node:fs/promises'; +import { Client, Pool, PoolClient } from 'pg'; + +import NorthPostgreSQL from './north-postgresql'; +import pino from 'pino'; +import PinoLogger from '../../tests/__mocks__/service/logger/logger.mock'; +import EncryptionService from '../../service/encryption.service'; +import EncryptionServiceMock from '../../tests/__mocks__/service/encryption-service.mock'; +import { NorthPostgreSQLSettings } from '../../../shared/model/north-settings.model'; +import { OIBusTimeValue } from '../../../shared/model/engine.model'; +import NorthConnectorRepository from '../../repository/config/north-connector.repository'; +import NorthConnectorRepositoryMock from '../../tests/__mocks__/repository/config/north-connector-repository.mock'; +import ScanModeRepository from '../../repository/config/scan-mode.repository'; +import ScanModeRepositoryMock from '../../tests/__mocks__/repository/config/scan-mode-repository.mock'; +import { NorthConnectorEntity } from '../../model/north-connector.model'; +import testData from '../../tests/utils/test-data'; +import { mockBaseFolders } from '../../tests/utils/test-utils'; +import CacheService from '../../service/cache/cache.service'; +import CacheServiceMock from '../../tests/__mocks__/service/cache/cache-service.mock'; +import { OIBusError } from '../../model/engine.model'; + +jest.mock('node:fs/promises'); +jest.mock('pg'); + +const logger: pino.Logger = new PinoLogger(); +const encryptionService: EncryptionService = new EncryptionServiceMock('', ''); +const northConnectorRepository: NorthConnectorRepository = new NorthConnectorRepositoryMock(); +const scanModeRepository: ScanModeRepository = new ScanModeRepositoryMock(); +const cacheService: CacheService = new CacheServiceMock(); + +jest.mock( + '../../service/cache/cache.service', + () => + function () { + return cacheService; + } +); + +const settings: NorthPostgreSQLSettings = { + host: 'localhost', + port: 5432, + database: 'test_db', + username: 'test_user', + password: 'encrypted_password', + table: 'oibus_data', + createTableIfNotExists: true, + batchSize: 1000, + connectionTimeout: 30, + customIndexes: [ + { + name: 'timestamp_desc', + column: 'timestamp', + unique: false, + order: 'DESC' + }, + { + name: 'point_unique', + column: 'point_id', + unique: true, + order: 'ASC' + } + ] +}; + +const timeValues: Array = [ + { + pointId: 'point1', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: 123.45, quality: 'good' } + }, + { + pointId: 'point2', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: 678.9, quality: 'good' } + } +]; + +let configuration: NorthConnectorEntity; +let north: NorthPostgreSQL; + +describe('NorthPostgreSQL', () => { + let mockPool: jest.Mocked; + let mockClient: jest.Mocked; + let mockTestClient: jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + jest.useFakeTimers().setSystemTime(new Date(testData.constants.dates.FAKE_NOW)); + + configuration = JSON.parse(JSON.stringify(testData.north.list[0])); + configuration.type = 'postgresql'; + configuration.settings = settings; + + (northConnectorRepository.findNorthById as jest.Mock).mockReturnValue(configuration); + (scanModeRepository.findById as jest.Mock).mockImplementation(id => testData.scanMode.list.find(element => element.id === id)); + + mockClient = { + connect: jest.fn(), + query: jest.fn(), + release: jest.fn(), + end: jest.fn() + } as unknown as jest.Mocked; + + mockPool = { + connect: jest.fn().mockResolvedValue(mockClient), + end: jest.fn(), + query: jest.fn() + } as unknown as jest.Mocked; + + mockTestClient = { + connect: jest.fn(), + query: jest.fn(), + end: jest.fn() + } as unknown as jest.Mocked; + + (Pool as jest.MockedClass).mockImplementation(() => mockPool); + (Client as jest.MockedClass).mockImplementation(() => mockTestClient); + + north = new NorthPostgreSQL( + configuration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + }); + + afterEach(() => { + cacheService.cacheSizeEventEmitter.removeAllListeners(); + }); + + describe('connect', () => { + beforeEach(() => { + (encryptionService.decryptText as jest.Mock).mockImplementation((_text: string) => Promise.resolve('decrypted_password')); + }); + + it('should properly connect to PostgreSQL', async () => { + await north.connect(); + + expect(Pool).toHaveBeenCalledWith({ + host: 'localhost', + port: 5432, + database: 'test_db', + user: 'test_user', + password: 'decrypted_password', + ssl: false, + max: 10, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 30000 + }); + + expect(mockPool.connect).toHaveBeenCalled(); + expect(mockClient.query).toHaveBeenCalledTimes(4); // CREATE TABLE, CREATE DEFAULT INDEX, and 2 custom indexes + expect(mockClient.release).toHaveBeenCalled(); + }); + + it('should skip table creation when createTableIfNotExists is false', async () => { + const skipTableSettings = { ...settings, createTableIfNotExists: false }; + const skipTableConfiguration = { ...configuration, settings: skipTableSettings }; + const skipTableNorth = new NorthPostgreSQL( + skipTableConfiguration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await skipTableNorth.connect(); + + expect(mockClient.query).not.toHaveBeenCalled(); + }); + + it('should throw error on connection failure', async () => { + (mockPool.connect as jest.Mock).mockRejectedValue(new Error('Connection failed')); + + await expect(north.connect()).rejects.toThrow(OIBusError); + }); + + it('should create custom indexes when specified', async () => { + await north.connect(); + + // Verify custom indexes were created + expect(mockClient.query).toHaveBeenCalledWith(expect.stringContaining('CREATE INDEX IF NOT EXISTS idx_oibus_data_timestamp_desc')); + expect(mockClient.query).toHaveBeenCalledWith( + expect.stringContaining('CREATE UNIQUE INDEX IF NOT EXISTS idx_oibus_data_point_unique') + ); + }); + + it('should handle custom index creation errors gracefully', async () => { + // Mock one of the custom index queries to fail + (mockClient.query as jest.Mock) + .mockResolvedValueOnce({}) // CREATE TABLE + .mockResolvedValueOnce({}) // CREATE DEFAULT INDEX + .mockRejectedValueOnce(new Error('Index creation failed')) // First custom index fails + .mockResolvedValueOnce({}); // Second custom index succeeds + + await north.connect(); + + // Should not throw error even if custom index creation fails + expect(mockClient.query).toHaveBeenCalledTimes(4); + }); + + it('should skip custom indexes when none are defined', async () => { + const noIndexSettings = { ...settings, customIndexes: null }; + const noIndexConfiguration = { ...configuration, settings: noIndexSettings }; + const noIndexNorth = new NorthPostgreSQL( + noIndexConfiguration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await noIndexNorth.connect(); + + expect(mockClient.query).toHaveBeenCalledTimes(2); // Only CREATE TABLE and default index + }); + }); + + describe('testConnection', () => { + beforeEach(() => { + (encryptionService.decryptText as jest.Mock).mockImplementation((_text: string) => Promise.resolve('decrypted_password')); + }); + + it('should test connection successfully', async () => { + await north.testConnection(); + + expect(mockTestClient.connect).toHaveBeenCalled(); + expect(mockTestClient.query).toHaveBeenCalledWith('SELECT 1'); + expect(mockTestClient.end).toHaveBeenCalled(); + }); + + it('should throw error when host is missing', async () => { + const invalidSettings = { ...settings, host: '' }; + const invalidConfiguration = { ...configuration, settings: invalidSettings }; + const invalidNorth = new NorthPostgreSQL( + invalidConfiguration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await expect(invalidNorth.testConnection()).rejects.toThrow('Host is required'); + }); + + it('should throw error when database is missing', async () => { + const invalidSettings = { ...settings, database: '' }; + const invalidConfiguration = { ...configuration, settings: invalidSettings }; + const invalidNorth = new NorthPostgreSQL( + invalidConfiguration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await expect(invalidNorth.testConnection()).rejects.toThrow('Database is required'); + }); + + it('should throw error when username is missing', async () => { + const invalidSettings = { ...settings, username: '' }; + const invalidConfiguration = { ...configuration, settings: invalidSettings }; + const invalidNorth = new NorthPostgreSQL( + invalidConfiguration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await expect(invalidNorth.testConnection()).rejects.toThrow('Username is required'); + }); + + it('should throw error when connection test fails', async () => { + (mockTestClient.connect as jest.Mock).mockRejectedValue(new Error('Connection test failed')); + + await expect(north.testConnection()).rejects.toThrow(OIBusError); + }); + }); + + describe('handleContent', () => { + beforeEach(async () => { + (encryptionService.decryptText as jest.Mock).mockImplementation((_text: string) => Promise.resolve('decrypted_password')); + await north.connect(); + }); + + it('should handle time-values content', async () => { + (fs.readFile as jest.Mock).mockReturnValue(JSON.stringify(timeValues)); + + await north.handleContent({ + contentFile: '/path/to/file/example-123.json', + contentSize: 1234, + numberOfElement: 2, + createdAt: '2020-02-02T02:02:02.222Z', + contentType: 'time-values', + source: 'south', + options: {} + }); + + expect(fs.readFile).toHaveBeenCalledWith('/path/to/file/example-123.json', { encoding: 'utf-8' }); + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('COMMIT'); + }); + + it('should throw error for unsupported content type', async () => { + await expect( + north.handleContent({ + contentFile: '/path/to/file/example.txt', + contentSize: 1234, + numberOfElement: 1, + createdAt: '2020-02-02T02:02:02.222Z', + contentType: 'raw', + source: 'south', + options: {} + }) + ).rejects.toThrow('Content type "raw" not supported by PostgreSQL North connector'); + }); + }); + + describe('handleValues', () => { + beforeEach(async () => { + (encryptionService.decryptText as jest.Mock).mockImplementation((_text: string) => Promise.resolve('decrypted_password')); + await north.connect(); + }); + + it('should handle values successfully', async () => { + await north.handleValues(timeValues); + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('COMMIT'); + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point1', new Date(testData.constants.dates.FAKE_NOW), 123.45, 'good', null] + ); + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point2', new Date(testData.constants.dates.FAKE_NOW), 678.9, 'good', null] + ); + }); + + it('should handle empty values array', async () => { + await north.handleValues([]); + + expect(mockClient.query).not.toHaveBeenCalled(); + }); + + it('should handle string values correctly', async () => { + const stringValues: Array = [ + { + pointId: 'point3', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: '456.78', quality: 'good' } + }, + { + pointId: 'point4', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: 'invalid', quality: 'bad' } + }, + { + pointId: 'point5', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: '0', quality: 'good' } + } + ]; + + await north.handleValues(stringValues); + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('COMMIT'); + // String "456.78" should be parsed to number 456.78 + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point3', new Date(testData.constants.dates.FAKE_NOW), 456.78, 'good', null] + ); + // String "invalid" should be converted to null + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point4', new Date(testData.constants.dates.FAKE_NOW), null, 'bad', null] + ); + // String "0" should be parsed to number 0 + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point5', new Date(testData.constants.dates.FAKE_NOW), 0, 'good', null] + ); + }); + + it('should handle null and undefined values correctly', async () => { + const nullValues: Array = [ + { + pointId: 'point6', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: null as unknown as string, quality: 'bad' } + }, + { + pointId: 'point7', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: undefined as unknown as string, quality: 'bad' } + } + ]; + + await north.handleValues(nullValues); + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('COMMIT'); + // Both null and undefined should result in null in database + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point6', new Date(testData.constants.dates.FAKE_NOW), null, 'bad', null] + ); + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['point7', new Date(testData.constants.dates.FAKE_NOW), null, 'bad', null] + ); + }); + + it('should handle digital values correctly', async () => { + const digitalValues: Array = [ + { + pointId: 'digital1', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: '?28', quality: 'good' } + }, + { + pointId: 'digital2', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: '?0', quality: 'good' } + }, + { + pointId: 'digital3', + timestamp: testData.constants.dates.FAKE_NOW, + data: { value: '?invalid', quality: 'bad' } + } + ]; + + await north.handleValues(digitalValues); + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('COMMIT'); + // Digital value "?28" should store both the numeric value and the original digital value + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['digital1', new Date(testData.constants.dates.FAKE_NOW), 28, 'good', '?28'] + ); + // Digital value "?0" should store 0 as numeric and "?0" as digital + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['digital2', new Date(testData.constants.dates.FAKE_NOW), 0, 'good', '?0'] + ); + // Digital value "?invalid" should store null as numeric but keep the original digital value + expect(mockClient.query).toHaveBeenCalledWith( + `INSERT INTO ${settings.table} (point_id, timestamp, data_value, data_quality, digital_value) VALUES ($1, $2, $3, $4, $5)`, + ['digital3', new Date(testData.constants.dates.FAKE_NOW), null, 'bad', '?invalid'] + ); + }); + + it('should rollback on error', async () => { + (mockClient.query as jest.Mock).mockRejectedValueOnce(new Error('Database error')); + + await expect(north.handleValues(timeValues)).rejects.toThrow('Error inserting values into PostgreSQL: Error: Database error'); + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN'); + expect(mockClient.query).toHaveBeenCalledWith('ROLLBACK'); + }); + + it('should throw error when pool is not initialized', async () => { + const uninitializedNorth = new NorthPostgreSQL( + configuration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await expect(uninitializedNorth.handleValues(timeValues)).rejects.toThrow('PostgreSQL pool not initialized'); + }); + }); + + describe('disconnect', () => { + beforeEach(async () => { + (encryptionService.decryptText as jest.Mock).mockImplementation((_text: string) => Promise.resolve('decrypted_password')); + await north.connect(); + }); + + it('should disconnect properly', async () => { + await north.disconnect(); + + expect(mockPool.end).toHaveBeenCalled(); + }); + + it('should handle disconnect when pool is not initialized', async () => { + const uninitializedNorth = new NorthPostgreSQL( + configuration, + encryptionService, + northConnectorRepository, + scanModeRepository, + logger, + mockBaseFolders(testData.north.list[0].id) + ); + + await expect(uninitializedNorth.disconnect()).resolves.not.toThrow(); + }); + }); +}); diff --git a/backend/src/north/north-postgresql/north-postgresql.ts b/backend/src/north/north-postgresql/north-postgresql.ts new file mode 100644 index 0000000000..f05d2a99c5 --- /dev/null +++ b/backend/src/north/north-postgresql/north-postgresql.ts @@ -0,0 +1,242 @@ +import fs from 'node:fs/promises'; +import { Client, Pool, PoolClient } from 'pg'; + +import NorthConnector from '../north-connector'; +import pino from 'pino'; +import EncryptionService from '../../service/encryption.service'; +import { NorthPostgreSQLSettings } from '../../../shared/model/north-settings.model'; +import { CacheMetadata, OIBusTimeValue } from '../../../shared/model/engine.model'; +import { OIBusError } from '../../model/engine.model'; +import { NorthConnectorEntity } from '../../model/north-connector.model'; +import NorthConnectorRepository from '../../repository/config/north-connector.repository'; +import ScanModeRepository from '../../repository/config/scan-mode.repository'; +import { BaseFolders } from '../../model/types'; + +/** + * Class NorthPostgreSQL - sends time-series data to a PostgreSQL database + */ +export default class NorthPostgreSQL extends NorthConnector { + private pool: Pool | null = null; + + constructor( + configuration: NorthConnectorEntity, + encryptionService: EncryptionService, + northConnectorRepository: NorthConnectorRepository, + scanModeRepository: ScanModeRepository, + logger: pino.Logger, + baseFolders: BaseFolders + ) { + super(configuration, encryptionService, northConnectorRepository, scanModeRepository, logger, baseFolders); + } + + override async connect(): Promise { + try { + const connectionConfig = { + host: this.connector.settings.host, + port: this.connector.settings.port, + database: this.connector.settings.database, + user: this.connector.settings.username, + password: this.connector.settings.password ? await this.encryptionService.decryptText(this.connector.settings.password) : undefined, + max: 10, // maximum number of clients in the pool + idleTimeoutMillis: 30000, // how long a client is allowed to remain idle before being closed + connectionTimeoutMillis: this.connector.settings.connectionTimeout * 1000 + }; + + this.pool = new Pool(connectionConfig); + + // Test the connection + const client = await this.pool.connect(); + + if (this.connector.settings.createTableIfNotExists) { + await this.createTableIfNotExists(client); + } + + client.release(); + await super.connect(); + } catch (error) { + throw new OIBusError(`Error connecting to PostgreSQL database: ${error}`, true); + } + } + + private async createTableIfNotExists(client: PoolClient): Promise { + const createTableQuery = ` + CREATE TABLE IF NOT EXISTS ${this.connector.settings.table} ( + id SERIAL PRIMARY KEY, + point_id VARCHAR(255) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + data_value DOUBLE PRECISION, + data_quality VARCHAR(50), + digital_value VARCHAR(50), + created_at TIMESTAMPTZ DEFAULT NOW() + ) + `; + + // Default index for common queries + const createDefaultIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_${this.connector.settings.table}_point_timestamp + ON ${this.connector.settings.table} (point_id, timestamp) + `; + + try { + await client.query(createTableQuery); + await client.query(createDefaultIndexQuery); + + // Create custom indexes if defined + if (this.connector.settings.customIndexes && this.connector.settings.customIndexes.length > 0) { + await this.createCustomIndexes(client); + } + + this.logger.debug(`Table "${this.connector.settings.table}" and indexes created or already exist`); + } catch (error) { + this.logger.error(`Error creating table "${this.connector.settings.table}": ${error}`); + throw error; + } + } + + private async createCustomIndexes(client: PoolClient): Promise { + for (const index of this.connector.settings.customIndexes!) { + const uniqueKeyword = index.unique ? 'UNIQUE' : ''; + const indexName = `idx_${this.connector.settings.table}_${index.name}`; + + const createIndexQuery = ` + CREATE ${uniqueKeyword} INDEX IF NOT EXISTS ${indexName} + ON ${this.connector.settings.table} (${index.column} ${index.order}) + `; + + try { + await client.query(createIndexQuery); + this.logger.debug(`Custom index "${indexName}" created on column "${index.column}"`); + } catch (error) { + this.logger.warn(`Failed to create custom index "${indexName}": ${error}`); + // Don't throw here - continue with other indexes + } + } + } + + async handleContent(cacheMetadata: CacheMetadata): Promise { + switch (cacheMetadata.contentType) { + case 'time-values': + return this.handleValues(JSON.parse(await fs.readFile(cacheMetadata.contentFile, { encoding: 'utf-8' })) as Array); + + default: + throw new OIBusError(`Content type "${cacheMetadata.contentType}" not supported by PostgreSQL North connector`, false); + } + } + + /** + * Handle values by inserting them into PostgreSQL database. + */ + async handleValues(values: Array): Promise { + if (!this.pool) { + throw new OIBusError('PostgreSQL pool not initialized', true); + } + + if (values.length === 0) { + this.logger.debug('No values to insert'); + return; + } + + const client = await this.pool.connect(); + + try { + await client.query('BEGIN'); + + // Process values in batches + for (let i = 0; i < values.length; i += this.connector.settings.batchSize) { + const batch = values.slice(i, i + this.connector.settings.batchSize); + await this.insertBatch(client, batch); + } + + await client.query('COMMIT'); + this.logger.debug(`Successfully inserted ${values.length} values into PostgreSQL`); + } catch (error) { + await client.query('ROLLBACK'); + this.logger.error(`Error inserting values into PostgreSQL: ${error}`); + throw new OIBusError(`Error inserting values into PostgreSQL: ${error}`, true); + } finally { + client.release(); + } + } + + private async insertBatch(client: PoolClient, values: Array): Promise { + const insertQuery = ` + INSERT INTO ${this.connector.settings.table} (point_id, timestamp, data_value, data_quality, digital_value) + VALUES ($1, $2, $3, $4, $5) + `; + + const insertPromises = values.map(async value => { + const timestamp = new Date(value.timestamp); + // Handle both string and number values, including digital values like "?28" + let dataValue: number | null = null; + let digitalValue: string | null = null; + + if (value.data.value !== null && value.data.value !== undefined) { + if (typeof value.data.value === 'number') { + dataValue = value.data.value; + } else if (typeof value.data.value === 'string') { + // Check if it's a digital value (starts with ?) + if (value.data.value.startsWith('?')) { + digitalValue = value.data.value; + // Extract numeric part if possible + const numericPart = value.data.value.substring(1); + const parsed = parseFloat(numericPart); + dataValue = isNaN(parsed) ? null : parsed; + } else { + // Regular numeric string + const parsed = parseFloat(value.data.value); + dataValue = isNaN(parsed) ? null : parsed; + } + } + } + const quality = value.data.quality || null; + + return client.query(insertQuery, [value.pointId, timestamp, dataValue, quality, digitalValue]); + }); + + await Promise.all(insertPromises); + } + + override async testConnection(): Promise { + if (!this.connector.settings.host) { + throw new OIBusError('Host is required', false); + } + + if (!this.connector.settings.database) { + throw new OIBusError('Database is required', false); + } + + if (!this.connector.settings.username) { + throw new OIBusError('Username is required', false); + } + + const connectionConfig = { + host: this.connector.settings.host, + port: this.connector.settings.port, + database: this.connector.settings.database, + user: this.connector.settings.username, + password: this.connector.settings.password ? await this.encryptionService.decryptText(this.connector.settings.password) : undefined, + connectionTimeoutMillis: this.connector.settings.connectionTimeout * 1000, + }; + + const client = new Client(connectionConfig); + + try { + await client.connect(); + await client.query('SELECT 1'); + this.logger.info('PostgreSQL connection test successful'); + } catch (error) { + throw new OIBusError(`PostgreSQL connection test failed: ${error}`, false); + } finally { + await client.end(); + } + } + + override async disconnect(): Promise { + if (this.pool) { + await this.pool.end(); + this.pool = null; + this.logger.info('Disconnected from PostgreSQL database'); + } + await super.disconnect(); + } +} diff --git a/backend/src/service/north.service.ts b/backend/src/service/north.service.ts index 6ae59bbae6..a428881da1 100644 --- a/backend/src/service/north.service.ts +++ b/backend/src/service/north.service.ts @@ -16,6 +16,7 @@ import amazonManifest from '../north/north-amazon-s3/manifest'; import sftpManifest from '../north/north-sftp/manifest'; import restManifest from '../north/north-rest/manifest'; import metroscopeLithiumManifest from '../north/north-metroscope-lithium/manifest'; +import postgresqlManifest from '../north/north-postgresql/manifest'; import { NorthConnectorEntity, NorthConnectorEntityLight } from '../model/north-connector.model'; import JoiValidator from '../web-server/controllers/validators/joi.validator'; import NorthConnectorRepository from '../repository/config/north-connector.repository'; @@ -34,6 +35,7 @@ import { NorthFileWriterSettings, NorthMetroscopeLithiumSettings, NorthOIAnalyticsSettings, + NorthPostgreSQLSettings, NorthRESTSettings, NorthSettings, NorthSFTPSettings @@ -44,6 +46,7 @@ import NorthAmazonS3 from '../north/north-amazon-s3/north-amazon-s3'; import NorthAzureBlob from '../north/north-azure-blob/north-azure-blob'; import NorthFileWriter from '../north/north-file-writer/north-file-writer'; import NorthMetroscopeLithium from '../north/north-metroscope-lithium/north-metroscope-lithium'; +import NorthPostgreSQL from '../north/north-postgresql/north-postgresql'; import NorthOIAnalytics from '../north/north-oianalytics/north-oianalytics'; import NorthSFTP from '../north/north-sftp/north-sftp'; import NorthREST from '../north/north-rest/north-rest'; @@ -63,6 +66,7 @@ export const northManifestList: Array = [ amazonManifest, fileWriterManifest, metroscopeLithiumManifest, + postgresqlManifest, sftpManifest, restManifest ]; @@ -155,6 +159,15 @@ export default class NorthService { logger, northBaseFolders ); + case 'postgresql': + return new NorthPostgreSQL( + settings as NorthConnectorEntity, + this.encryptionService, + this.northConnectorRepository, + this.scanModeRepository, + logger, + northBaseFolders + ); case 'rest': return new NorthREST( settings as NorthConnectorEntity, diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 723e63214f..1ae5800a85 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -198,6 +198,7 @@ "console": "Console", "file-writer": "File writer", "oianalytics": "OIAnalytics®", + "postgresql": "PostgreSQL®", "sftp": "SFTP", "rest": "REST", "metroscope-lithium": "Metroscope Lithium" @@ -208,6 +209,7 @@ "console": "Display filenames or values in Console (used for debug)", "file-writer": "Write files and data to the output folder", "oianalytics": "Send files and values to OIAnalytics®", + "postgresql": "Send time-series data to a PostgreSQL® database. Supports numeric values and digital values (e.g., ?28)", "sftp": "Upload files and data to an SFTP server", "rest": "Upload files to a REST endpoint", "metroscope-lithium": "Send data to a Metroscope Lithium endpoint" @@ -1726,6 +1728,24 @@ "timeout": "Timeout", "group": "Group", "label": "Label" + }, + "postgresql": { + "host": "Host", + "port": "Port", + "database": "Database", + "username": "Username", + "password": "Password", + "table": "Table name", + "create-table-if-not-exists": "Create table", + "batch-size": "Number of records to insert in a single transaction (improves performance)", + "connection-timeout": "Connection timeout", + "custom-indexes": { + "custom-index": "Custom database indexes", + "name": "Index name", + "column.title": "Database column to index", + "unique": "Unique constraint", + "order.title": "Sort order for the index" + } } }, "logs": {