Skip to content
This repository has been archived by the owner on Jul 14, 2023. It is now read-only.

Commit

Permalink
Stream (#389)
Browse files Browse the repository at this point in the history
* test: media type save and read

* feat: upload and download stream file

* test: remove duplicated test case

---------

Co-authored-by: ryegrost8 <[email protected]>
  • Loading branch information
ryegrosT8 and ryegrost8 committed Mar 16, 2023
1 parent e95c8ed commit e0eb956
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 5 deletions.
2 changes: 2 additions & 0 deletions __tests__/__assets__/beershop-admin-service.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ CREATE TABLE csw_TypeChecks (
type_Binary CHAR(100),
type_LargeBinary BYTEA,
type_LargeString TEXT,
type_mediaType VARCHAR(5000),
type_mediaContent BYTEA,
PRIMARY KEY(ID)
);

Expand Down
2 changes: 2 additions & 0 deletions __tests__/__assets__/cap-proj/db/schema.cds
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ entity TypeChecks : cuid {
type_Binary : Binary(100);
type_LargeBinary : LargeBinary;
type_LargeString : LargeString;
type_mediaType : String @Core.IsMediaType: true;
type_mediaContent : LargeBinary @Core.MediaType : type_mediaType;
virtual type_virtual : Integer;
}

Expand Down
12 changes: 10 additions & 2 deletions __tests__/__assets__/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ CREATE TABLE csw_TypeChecks (
type_Binary CHAR(100),
type_LargeBinary BYTEA,
type_LargeString TEXT,
type_mediaType VARCHAR(5000),
type_mediaContent BYTEA,
PRIMARY KEY(ID)
);

Expand Down Expand Up @@ -73,6 +75,8 @@ CREATE TABLE BeershopService_TypeChecksWithDraft_drafts (
type_Binary CHAR(100) NULL,
type_LargeBinary BYTEA NULL,
type_LargeString TEXT NULL,
type_mediaType VARCHAR(5000) NULL,
type_mediaContent BYTEA NULL,
IsActiveEntity BOOLEAN,
HasActiveEntity BOOLEAN,
HasDraftEntity BOOLEAN,
Expand Down Expand Up @@ -126,7 +130,9 @@ CREATE VIEW BeershopService_TypeChecks AS SELECT
TypeChecks_0.type_String,
TypeChecks_0.type_Binary,
TypeChecks_0.type_LargeBinary,
TypeChecks_0.type_LargeString
TypeChecks_0.type_LargeString,
TypeChecks_0.type_mediaType,
TypeChecks_0.type_mediaContent
FROM csw_TypeChecks AS TypeChecks_0;

CREATE VIEW BeershopService_TypeChecksWithDraft AS SELECT
Expand All @@ -143,7 +149,9 @@ CREATE VIEW BeershopService_TypeChecksWithDraft AS SELECT
TypeChecks_0.type_String,
TypeChecks_0.type_Binary,
TypeChecks_0.type_LargeBinary,
TypeChecks_0.type_LargeString
TypeChecks_0.type_LargeString,
TypeChecks_0.type_mediaType,
TypeChecks_0.type_mediaContent
FROM csw_TypeChecks AS TypeChecks_0;

CREATE VIEW BeershopService_TypeChecksSibling AS SELECT
Expand Down
22 changes: 22 additions & 0 deletions __tests__/lib/pg/service.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const cds = require('@sap/cds')
const deploy = require('@sap/cds/lib/dbs/cds-deploy')
const { fs } = require('@sap/cds/lib/utils/cds-utils')
// const path = require('path')

// mock (package|.cds'rc).json entries
Expand Down Expand Up @@ -236,6 +237,27 @@ describe.each(suiteEnvironments)(
ibu: 20
})
})

test('odata: stream -> sql: upload/dowload stream Media Content', async () => {
//Create a media
const response = await request.post('/beershop/TypeChecks').send({
type_mediaType: 'text/plain'
})

const entry = JSON.parse(response.text)

const mediaPath = `/beershop/TypeChecks(${entry.ID})/type_mediaContent`

const buffer = fs.readFileSync('__tests__/__assets__/test.sql')

// Upload file
await request.put(mediaPath).attach('file', buffer).expect(204)

// Download file
const mediaResponse = await request.get(mediaPath).send().responseType('blob').expect(200)

expect(mediaResponse.body.toString().includes(buffer.toString())).toEqual(true)
})
})

describe('odata: GET on Draft enabled Entity -> sql: SELECT', () => {
Expand Down
60 changes: 57 additions & 3 deletions lib/pg/execute.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const { postProcess, getPostProcessMapper } = require('@sap/cds/libx/_runtime/db
const { PG_TYPE_CONVERSION_MAP } = require('./converters/conversion')
const { flattenArray } = require('./utils/deep')
const { remapColumnNames } = require('./utils/columns')
const { Readable } = require('stream')

const DEBUG = cds.debug('cds-pg|sql')

/*
Expand All @@ -23,14 +25,17 @@ const DEBUG = cds.debug('cds-pg|sql')
* @param {*} txTimestamp
* @return {import('pg').QueryArrayResult}
*/
const executeGenericCQN = (model, dbc, query, user, locale, txTimestamp) => {
const executeGenericCQN = async (model, dbc, query, user, locale, txTimestamp) => {
const { sql, values = [] } = _cqnToSQL(model, query, user, locale, txTimestamp)

const vals = await _convertStreamValues(values)

if (/^\s*insert/i.test(sql)) {
return executeInsertCQN(model, dbc, query, user, locale, txTimestamp)
}
const isOne = query.SELECT && query.SELECT.one
const postPropertyMapper = getPostProcessMapper(PG_TYPE_CONVERSION_MAP, model, query)
return _executeSQLReturningRows(dbc, sql, values, isOne, postPropertyMapper)
return _executeSQLReturningRows(dbc, sql, vals, isOne, postPropertyMapper)
}

/**
Expand Down Expand Up @@ -231,12 +236,61 @@ const executeUpdateCQN = async (model, dbc, cqn, user, locale, txTimestamp) => {
return Array.isArray(result) ? result.length : result
}

/**
* Processes a SELECT CQN statement and executes the query against the database.
* The result rows are processed and returned.
* @param {Object} model
* @param {import('pg').PoolClient} dbc
* @param {Object} query
* @param {*} user
* @param {String} locale
* @param {*} txTimestamp
* @return {import('pg-query-stream').QueryStream}
*/
const executeSelectStreamCQN = async (model, dbc, query, user, locale, txTimestamp) => {
const result = await executeSelectCQN(model, dbc, query, user, locale, txTimestamp)

if (result == null || result.length === 0) {
return
}

let val = Array.isArray(result) ? Object.values(result[0])[0] : Object.values(result)[0]
if (val === null) {
return null
}
if (typeof val === 'number') {
val = val.toString()
}

const stream_ = Readable.from(val)

return { value: stream_ }
}

const _convertStreamValues = async (values) => {
let any
values.forEach((v, i) => {
if (v && typeof v.pipe === 'function') {
any = values[i] = new Promise((resolve) => {
const chunks = []
v.on('data', (chunk) => chunks.push(chunk))
v.on('end', () => resolve(Buffer.concat(chunks)))
v.on('error', () => {
v.removeAllListeners('error')
v.push(null)
})
})
}
})
return any ? Promise.all(values) : values
}

module.exports = {
delete: executeGenericCQN,
insert: executeInsertCQN,
update: executeUpdateCQN,
read: executeSelectCQN,
//stream: executeSelectStreamCQN,
stream: executeSelectStreamCQN,
cqn: executeGenericCQN,
sql: executePlainSQL
}

0 comments on commit e0eb956

Please sign in to comment.