Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Period_prior_to_index function #35

Open
wants to merge 25 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
This is a list of documentation associated with every single **exported** function from `OMOPCDMPathways`.
There are a few different sections with a brief explanation of what these sections are followed by relevant functions.


## Pre-Processing

This family of functions are dedicated to pre-process the Data.

```@docs
calculate_era_duration
period_prior_to_index
```
```
145 changes: 92 additions & 53 deletions src/preprocessing.jl
Original file line number Diff line number Diff line change
@@ -1,83 +1,122 @@
using Dates

function Dummy(
drug_exposure_ids,
conn;
tab = drug_exposure
)
"""
# Example:

df = DBInterface.execute(conn, Dummy(drug_exposure_ids; tab=tab)) |> DataFrame
period_prior_to_index(
cohort_id = [1, 1, 1, 1, 1],
conn;
date_prior = Day(100),
tab=cohort
)

return df
end
# Implemetation:
(1) Constructs a SQL query to select cohort_definition_id, subject_id, and cohort_start_date from a specified table, filtering by cohort_id.
(2) Executes the constructed SQL query using a database connection, fetching the results into a DataFrame.
(3) If the DataFrame is not empty, converts cohort_start_date to DateTime and subtracts date_prior from each date, then returns the modified DataFrame.

Given `cohort_id's` , return a `DataFrame` with the `cohort_start_date` adjusted to prior each subjects' cohort entry date (i.e. their `cohort_start_date`)

# Arguments:

- `cohort_id` - vector of cohort IDs
- `conn` - database connection

# Keyword Arguments:

function Dummy(
drug_exposure_ids;
tab = drug_exposure
)
- `date_prior::Dates.AbstractTime` - how much time prior the index date should be adjusted by; accepts a `Dates.AbstractTim`e object such as `Day`, `Month`, etc. (Default: `Day(100)`)
- `tab` - the `SQLTable` representing the cohort table. (Default: `cohort`)

sql =
From(tab) |>
Where(Fun.in(Get.drug_exposure_id, drug_exposure_ids...)) |>
Select(Get.drug_exposure_id, Get.drug_exposure_start_date) |>
q -> render(q, dialect=dialect)
# Returns

- DataFrame with the `cohort_start_date` adjusted by the `date_prior`.

"""
Jay-sanjay marked this conversation as resolved.
Show resolved Hide resolved
function period_prior_to_index(cohort_id::Vector, conn; date_prior=Day(100), tab=cohort)

return String(sql)
# Construct the SQL query
sql = From(tab) |>
Where(Fun.in(Get.cohort_definition_id, cohort_id...)) |>
Select(Get.cohort_definition_id, Get.subject_id, Get.cohort_start_date) |>
q -> render(q, dialect=dialect)

# Execute the SQL query and fetch the result into a DataFrame
df = DBInterface.execute(conn, String(sql)) |> DataFrame

if nrow(df) > 0
# Convert the cohort_start_date to DateTime and subtract the date_prior
df.cohort_start_date = DateTime.(df.cohort_start_date) .- date_prior
else
error("Invalid DataFrame: $df")

Check warning on line 50 in src/preprocessing.jl

View check run for this annotation

Codecov / codecov/patch

src/preprocessing.jl#L50

Added line #L50 was not covered by tests
end

return df
end


"""
```julia
calculate_era_duration(
treatment_history::DataFrame,
minEraDuration::Real
)
```
#Example:

Given a treatment history dataframe, this function filters out rows where the difference between `drug_exposure_start` and `drug_exposure_end` is less than `minEraDuration`.
function start_date_on_person(cohort_id::Vector, tables, conn)

# Arguments:
tab = tables[:cohort]
date_prior = Day(100)

- `treatment_history::DataFrame` - treatment history dataframe.
- `minEraDuration::Real` - minimum duration of an era.
sql = From(tab) |>
Where(Fun.in(Get.cohort_definition_id, cohort_id...)) |>
Select(Get.cohort_definition_id, Get.subject_id, Get.cohort_start_date) |>
q -> render(q, dialect = :sqlite)

# Returns:
df = DBInterface.execute(conn, String(sql)) |> DataFrame

- Updated `DataFrame`, rows where the difference between `drug_exposure_start` and `drug_exposure_end` is less than `minEraDuration` are filtered out.
# Check if the DataFrame is not empty
if nrow(df) > 0
# Convert the cohort_start_date to DateTime and subtract the date_prior
df.cohort_start_date = DateTime.(df.cohort_start_date) .- date_prior
else
error("Invalid DataFrame")
end

return df
end

# Note:
period_prior_to_index(
cohort_id = [1, 1, 1, 1, 1],
index_date_func = start_date_on_person,
conn;
)

It filters the treatment history `DataFrame` to retain only those rows where the duration between `drug_exposure_end` and `drug_exposure_start` is at least `minEraDuration`.
# Implementation:
(1) Calls GenerateTables with the database connection conn to generate tables, specifying inplace = false and exported = true.
(2) Invokes the index_date_func function, passing cohort_id, the generated tables, and the connection conn, to obtain a DataFrame df.
(3) Returns the DataFrame df.

# Example:
function period_prior_to_index(person_ids::Vector, index_date_func::Function, conn; date_prior=Day(100))

```julia-repl
julia> test_person_ids = [1, 1, 1, 1, 1];
Given a vector of person IDs, this function returns a DataFrame with the cohort_start_date adjusted by the date_prior.

julia> test_drug_start_date = [-3.727296e8, 2.90304e7, -5.333472e8, -8.18208e7, 1.3291776e9];
# Arguments:

julia> test_drug_end_date = [-364953600, 31449600, -532483200, -80006400, 1330387200];
- `cohort_id` - vector of cohort IDs
- `index_date_func` - function that returns the SQL query to get the start date of the person
- `conn` - database connection

julia> test_df = DataFrame(person_id = test_person_ids, drug_exposure_start = test_drug_start_date, drug_exposure_end = test_drug_end_date);
# Returns

- DataFrame with the `cohort_start_date` adjusted by the `date_prior`.

julia> calculate_era_duration(test_df, 920000)
4×3 DataFrame
Row │ person_id drug_exposure_start drug_exposure_end
│ Int64 Float64 Int64
─────┼───────────────────────────────────────────────────
1 │ 1 -3.7273e8 -364953600
2 │ 1 2.90304e7 31449600
3 │ 1 -8.18208e7 -80006400
4 │ 1 1.32918e9 1330387200
```
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an example section here about how to call this function and how to pass in a function into the argument which accepts a user defined function? So show how to make a function? Thanks!

function calculate_era_duration(treatment_history::DataFrame, minEraDuration)

treatment_history = filter(row -> (row[:drug_exposure_end] - row[:drug_exposure_start]) >= minEraDuration, treatment_history)
function period_prior_to_index(
cohort_id::Vector,
index_date_func::Function,
conn;
)

tables = GenerateTables(conn, inplace = false, exported=true)

df = index_date_func(cohort_id, tables, conn)

return treatment_history
return df
end

export Dummy, calculate_era_duration
export period_prior_to_index
64 changes: 37 additions & 27 deletions test/Data-Preprocessing/preprocessing.jl
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
using Test

@testset "Dummy Tests" begin
@testset "Period Prior to Index Tests" begin
MakeTables(sqlite_conn, :sqlite, "main")
test_drug_exposure_ids = [1.0, 2.0, 3.0, 4.0]
test_drug_exposure_start_date = [-3.727296e8, 2.90304e7, -5.333472e8, -8.18208e7]
test_df1 = DataFrame(drug_exposure_id = test_drug_exposure_ids, drug_exposure_start_date = test_drug_exposure_start_date)
result = Dummy(test_drug_exposure_ids, sqlite_conn)

@test test_drug_exposure_start_date == result.drug_exposure_start_date[1:4]
@test test_drug_exposure_ids == result.drug_exposure_id[1:4]

end



@testset "Calculate Era Duration Tests" begin
MakeTables(sqlite_conn, :sqlite, "main")

test_person_ids = [1, 1, 1, 1, 1]
test_drug_start_date = [-3.727296e8, 2.90304e7, -5.333472e8, -8.18208e7, 1.3291776e9]
test_drug_end_date = [-364953600, 31449600, -532483200, -80006400, 1330387200]
test_subject_ids = [1.0, 5.0, 9.0, 11.0, 12.0]
test_cohort_start_date = [-3.7273e8, 2.90304e7, -5.33347e8, -8.18208e7, 1.32918e9]

test_df2 = DataFrame(person_id = test_person_ids, cohort_start_date = test_cohort_start_date)

test_df3 = DataFrame(person_id = test_person_ids, drug_exposure_start = test_drug_start_date, drug_exposure_end = test_drug_end_date)
result = period_prior_to_index(test_person_ids, sqlite_conn)

@test test_person_ids == result.cohort_definition_id[1:5]
@test test_subject_ids == result.subject_id[1:5]

function start_date_on_person(cohort_id::Vector, tables, conn)

tab = tables[:cohort]
date_prior = Day(100)

sql = From(tab) |>
Where(Fun.in(Get.cohort_definition_id, cohort_id...)) |>
Select(Get.cohort_definition_id, Get.subject_id, Get.cohort_start_date) |>
q -> render(q, dialect = :sqlite)

df = DBInterface.execute(conn, String(sql)) |> DataFrame

# Check if the DataFrame is not empty
if nrow(df) > 0
# Convert the cohort_start_date to DateTime and subtract the date_prior
df.cohort_start_date = DateTime.(df.cohort_start_date) .- date_prior
else
error("Invalid DataFrame: $df")
end

return df
end

result = period_prior_to_index(test_person_ids, start_date_on_person, sqlite_conn)

expected_person_id = [1, 1, 1, 1]
expected_drug_exposure_start = [-3.727296e8, 2.90304e7, -8.18208e7, 1.3291776e9]
expected_drug_exposure_end = [-364953600, 31449600, -80006400, 1330387200]
@test test_person_ids == result.cohort_definition_id[1:5]
@test test_subject_ids == result.subject_id[1:5]
Jay-sanjay marked this conversation as resolved.
Show resolved Hide resolved

result = calculate_era_duration(test_df3, 920000)

@test expected_person_id == result.person_id[1:4]
@test expected_drug_exposure_start == result.drug_exposure_start[1:4]
@test expected_drug_exposure_end == result.drug_exposure_end[1:4]
end
end
56 changes: 56 additions & 0 deletions test/assets/strep_throat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"ConceptSets": [
{
"id": 0,
"name": "[jz] Strep Throat Concepts",
"expression": {
"items": [
{
"concept": {
"CONCEPT_CLASS_ID": "Clinical Finding",
"CONCEPT_CODE": "43878008",
"CONCEPT_ID": 28060,
"CONCEPT_NAME": "Streptococcal sore throat",
"DOMAIN_ID": "Condition",
"INVALID_REASON": "V",
"INVALID_REASON_CAPTION": "Valid",
"STANDARD_CONCEPT": "S",
"STANDARD_CONCEPT_CAPTION": "Standard",
"VOCABULARY_ID": "SNOMED"
}
}
]
}
}
],
"PrimaryCriteria": {
"CriteriaList": [
{
"ConditionOccurrence": {
"CodesetId": 0
}
}
],
"ObservationWindow": {
"PriorDays": 0,
"PostDays": 0
},
"PrimaryCriteriaLimit": {
"Type": "All"
}
},
"QualifiedLimit": {
"Type": "First"
},
"ExpressionLimit": {
"Type": "All"
},
"InclusionRules": [],
"CensoringCriteria": [],
"CollapseSettings": {
"CollapseType": "ERA",
"EraPad": 0
},
"CensorWindow": {},
"cdmVersionRange": ">=5.0.0"
}
13 changes: 13 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ sqlite_conn = SQLite.DB(Eunomia())
GenerateDatabaseDetails(:sqlite, "main")
GenerateTables(sqlite_conn)

cohort = read("./assets/strep_throat.json", String)

#using DBInterface

model = Model(cdm_version=v"5.3.1", cdm_schema="main",
vocabulary_schema="main", results_schema="main",
target_schema="main", target_table="cohort");

sql = translate(cohort, dialect=:sqlite, model=model,
cohort_definition_id=1);

[DBI.execute(sqlite_conn, sub_query) for sub_query in split(sql, ";")[1:end-1]]

@testset "OMOPCDMPathways" begin
@testset "Data-Preprocessing" begin
include("Data-Preprocessing/preprocessing.jl")
Expand Down
Loading