diff --git a/__tests__/__assets__/beershop-admin-service.sql b/__tests__/__assets__/beershop-admin-service.sql index f48f40e..34ae038 100644 --- a/__tests__/__assets__/beershop-admin-service.sql +++ b/__tests__/__assets__/beershop-admin-service.sql @@ -42,6 +42,8 @@ CREATE TABLE csw_TypeChecks ( type_Binary CHAR(100), type_LargeBinary BYTEA, type_LargeString TEXT, + type_mediaType VARCHAR(5000), + type_mediaContent BYTEA, PRIMARY KEY(ID) ); diff --git a/__tests__/__assets__/cap-proj/db/schema.cds b/__tests__/__assets__/cap-proj/db/schema.cds index f2a86b3..6c17692 100644 --- a/__tests__/__assets__/cap-proj/db/schema.cds +++ b/__tests__/__assets__/cap-proj/db/schema.cds @@ -33,6 +33,8 @@ entity TypeChecks : cuid { type_Binary : Binary(100); type_LargeBinary : LargeBinary; type_LargeString : LargeString; + type_mediaType : String @Core.IsMediaType: true; + type_mediaContent : LargeBinary @Core.MediaType : type_mediaType; virtual type_virtual : Integer; } diff --git a/__tests__/__assets__/test.sql b/__tests__/__assets__/test.sql index 3a2019e..06e0651 100644 --- a/__tests__/__assets__/test.sql +++ b/__tests__/__assets__/test.sql @@ -36,6 +36,8 @@ CREATE TABLE csw_TypeChecks ( type_Binary CHAR(100), type_LargeBinary BYTEA, type_LargeString TEXT, + type_mediaType VARCHAR(5000), + type_mediaContent BYTEA, PRIMARY KEY(ID) ); @@ -73,6 +75,8 @@ CREATE TABLE BeershopService_TypeChecksWithDraft_drafts ( type_Binary CHAR(100) NULL, type_LargeBinary BYTEA NULL, type_LargeString TEXT NULL, + type_mediaType VARCHAR(5000) NULL, + type_mediaContent BYTEA NULL, IsActiveEntity BOOLEAN, HasActiveEntity BOOLEAN, HasDraftEntity BOOLEAN, @@ -126,7 +130,9 @@ CREATE VIEW BeershopService_TypeChecks AS SELECT TypeChecks_0.type_String, TypeChecks_0.type_Binary, TypeChecks_0.type_LargeBinary, - TypeChecks_0.type_LargeString + TypeChecks_0.type_LargeString, + TypeChecks_0.type_mediaType, + TypeChecks_0.type_mediaContent FROM csw_TypeChecks AS TypeChecks_0; CREATE VIEW BeershopService_TypeChecksWithDraft AS SELECT @@ -143,7 +149,9 @@ CREATE VIEW BeershopService_TypeChecksWithDraft AS SELECT TypeChecks_0.type_String, TypeChecks_0.type_Binary, TypeChecks_0.type_LargeBinary, - TypeChecks_0.type_LargeString + TypeChecks_0.type_LargeString, + TypeChecks_0.type_mediaType, + TypeChecks_0.type_mediaContent FROM csw_TypeChecks AS TypeChecks_0; CREATE VIEW BeershopService_TypeChecksSibling AS SELECT diff --git a/__tests__/lib/pg/service.test.js b/__tests__/lib/pg/service.test.js index d97646c..ab53c4a 100644 --- a/__tests__/lib/pg/service.test.js +++ b/__tests__/lib/pg/service.test.js @@ -1,5 +1,6 @@ const cds = require('@sap/cds') const deploy = require('@sap/cds/lib/dbs/cds-deploy') +const { fs } = require('@sap/cds/lib/utils/cds-utils') // const path = require('path') // mock (package|.cds'rc).json entries @@ -236,6 +237,27 @@ describe.each(suiteEnvironments)( ibu: 20 }) }) + + test('odata: stream -> sql: upload/dowload stream Media Content', async () => { + //Create a media + const response = await request.post('/beershop/TypeChecks').send({ + type_mediaType: 'text/plain' + }) + + const entry = JSON.parse(response.text) + + const mediaPath = `/beershop/TypeChecks(${entry.ID})/type_mediaContent` + + const buffer = fs.readFileSync('__tests__/__assets__/test.sql') + + // Upload file + await request.put(mediaPath).attach('file', buffer).expect(204) + + // Download file + const mediaResponse = await request.get(mediaPath).send().responseType('blob').expect(200) + + expect(mediaResponse.body.toString().includes(buffer.toString())).toEqual(true) + }) }) describe('odata: GET on Draft enabled Entity -> sql: SELECT', () => { diff --git a/lib/pg/execute.js b/lib/pg/execute.js index 10e475b..d4595e2 100644 --- a/lib/pg/execute.js +++ b/lib/pg/execute.js @@ -5,6 +5,8 @@ const { postProcess, getPostProcessMapper } = require('@sap/cds/libx/_runtime/db const { PG_TYPE_CONVERSION_MAP } = require('./converters/conversion') const { flattenArray } = require('./utils/deep') const { remapColumnNames } = require('./utils/columns') +const { Readable } = require('stream') + const DEBUG = cds.debug('cds-pg|sql') /* @@ -23,14 +25,17 @@ const DEBUG = cds.debug('cds-pg|sql') * @param {*} txTimestamp * @return {import('pg').QueryArrayResult} */ -const executeGenericCQN = (model, dbc, query, user, locale, txTimestamp) => { +const executeGenericCQN = async (model, dbc, query, user, locale, txTimestamp) => { const { sql, values = [] } = _cqnToSQL(model, query, user, locale, txTimestamp) + + const vals = await _convertStreamValues(values) + if (/^\s*insert/i.test(sql)) { return executeInsertCQN(model, dbc, query, user, locale, txTimestamp) } const isOne = query.SELECT && query.SELECT.one const postPropertyMapper = getPostProcessMapper(PG_TYPE_CONVERSION_MAP, model, query) - return _executeSQLReturningRows(dbc, sql, values, isOne, postPropertyMapper) + return _executeSQLReturningRows(dbc, sql, vals, isOne, postPropertyMapper) } /** @@ -231,12 +236,61 @@ const executeUpdateCQN = async (model, dbc, cqn, user, locale, txTimestamp) => { return Array.isArray(result) ? result.length : result } +/** + * Processes a SELECT CQN statement and executes the query against the database. + * The result rows are processed and returned. + * @param {Object} model + * @param {import('pg').PoolClient} dbc + * @param {Object} query + * @param {*} user + * @param {String} locale + * @param {*} txTimestamp + * @return {import('pg-query-stream').QueryStream} + */ +const executeSelectStreamCQN = async (model, dbc, query, user, locale, txTimestamp) => { + const result = await executeSelectCQN(model, dbc, query, user, locale, txTimestamp) + + if (result == null || result.length === 0) { + return + } + + let val = Array.isArray(result) ? Object.values(result[0])[0] : Object.values(result)[0] + if (val === null) { + return null + } + if (typeof val === 'number') { + val = val.toString() + } + + const stream_ = Readable.from(val) + + return { value: stream_ } +} + +const _convertStreamValues = async (values) => { + let any + values.forEach((v, i) => { + if (v && typeof v.pipe === 'function') { + any = values[i] = new Promise((resolve) => { + const chunks = [] + v.on('data', (chunk) => chunks.push(chunk)) + v.on('end', () => resolve(Buffer.concat(chunks))) + v.on('error', () => { + v.removeAllListeners('error') + v.push(null) + }) + }) + } + }) + return any ? Promise.all(values) : values +} + module.exports = { delete: executeGenericCQN, insert: executeInsertCQN, update: executeUpdateCQN, read: executeSelectCQN, - //stream: executeSelectStreamCQN, + stream: executeSelectStreamCQN, cqn: executeGenericCQN, sql: executePlainSQL }