Skip to content

Commit

Permalink
adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
sehnem committed Jul 6, 2023
1 parent c317d1a commit 23624bf
Show file tree
Hide file tree
Showing 15 changed files with 1,701 additions and 197 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand Down
12 changes: 0 additions & 12 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,8 @@ repos:
- id: check-dependabot
- id: check-github-workflows

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.269
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]

- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
additional_dependencies:
- types-requests
57 changes: 28 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,46 @@

Built with the [Meltano Tap SDK](https://sdk.meltano.com) for Singer Taps.

<!--
Developer TODO: Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPi repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.

## Installation

Install from PyPi:
```bash
pipx install tap-shopify
```
Install from GitHub:

```bash
pipx install git+https://github.com/ORG_NAME/tap-shopify.git@main
pipx install git+https://github.com/sehnem/tap-shopify.git@master
```

-->

## Configuration

### Accepted Config Options

<!--
Developer TODO: Provide a list of config options accepted by the tap.
This section can be created by copy-pasting the CLI output from:
```
tap-shopify --about --format=markdown
```
-->
## Capabilities

* `catalog`
* `state`
* `discover`
* `about`
* `stream-maps`
* `schema-flattening`
* `batch`

## Settings

| Setting | Required | Default | Description |
|:--------------------|:--------:|:-------:|:------------|
| auth_token | True | None | The token to authenticate against the API service |
| shop | True | None | The shopify shop name |
| api_version | False | 2023-04 | The version of the API to use |
| start_date | False | None | The earliest record date to sync |
| bulk | False | false | To use the bulk API or not |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth| False | None | The max depth to flatten schemas. |
| batch_config | False | None | |

A full list of supported settings and capabilities is available by running: `tap-shopify --about`

A full list of supported settings and capabilities for this
tap is available by running:
Expand All @@ -53,9 +60,7 @@ environment variable is set either in the terminal context or in the `.env` file

### Source Authentication and Authorization

<!--
Developer TODO: If your tap requires special access on the source system, or any special authentication requirements, provide those here.
-->
To get the access token for your store use the folowing [guide](https://shopify.dev/docs/apps/auth/admin-app-access-tokens).

## Usage

Expand Down Expand Up @@ -100,12 +105,6 @@ poetry run tap-shopify --help
_**Note:** This tap will work in any Singer environment and does not require Meltano.
Examples here are for convenience and to streamline end-to-end orchestration scenarios._

<!--
Developer TODO:
Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any "TODO" items listed in
the file.
-->

Next, install Meltano (if you haven't already) and any needed plugins:

```bash
Expand Down
1,260 changes: 1,260 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "GraphQL-tap-shopify"
version = "0.0.1"
version = "1.0.0"
description = "`tap-shopify` is a Singer tap for Shopify, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Josué Sehnem"]
Expand All @@ -14,7 +14,7 @@ packages = [
]

[tool.poetry.dependencies]
python = "<3.12,>=3.7.1"
python = "<3.12,>=3.8"
singer-sdk = { version="^0.28.0" }
fs-s3fs = { version = "^1.1.1", optional = true }
requests = "^2.31.0"
Expand All @@ -37,7 +37,7 @@ ignore = [
]
select = ["ALL"]
src = ["tap_shopify"]
target-version = "py37"
target-version = "py38"


[tool.ruff.flake8-annotations]
Expand Down
164 changes: 51 additions & 113 deletions tap_shopify/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,33 @@

from __future__ import annotations

import math
from functools import cached_property
from time import sleep
from typing import Any, Dict, Iterable, List, Optional, Union
from inspect import stack
from typing import Any, Optional

import requests # noqa: TCH002
from singer_sdk import typing as th
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import GraphQLStream

from tap_shopify.gql_queries import (
query_incremental,
schema_query,
simple_query,
simple_query_incremental,
)
from tap_shopify.gql_queries import schema_query


def verify_recursion(func):
"""Verify if the stream is recursive."""
objs = []

def wrapper(*args, **kwargs):
if not [f for f in stack() if f.function == func.__name__]:
objs.clear()
field_name = args[1]["name"]
field_kind = args[1]["kind"]
if field_name not in objs:
if field_kind == "OBJECT":
objs.append(args[1]["name"])
result = func(*args, **kwargs)
return result

return wrapper


class ShopifyStream(GraphQLStream):
Expand All @@ -30,7 +41,7 @@ class ShopifyStream(GraphQLStream):
restore_rate = None
max_points = None
single_object_params = None
recursive_objs = []
ignore_objs = []

@property
def url_base(self) -> str:
Expand Down Expand Up @@ -60,6 +71,7 @@ def schema_gql(self) -> dict:
)
return response.json()["data"]["__schema"]["types"]

@verify_recursion
def extract_field_type(self, field) -> str:
"""Extract the field type from the schema."""
type_mapping = {
Expand All @@ -71,25 +83,22 @@ def extract_field_type(self, field) -> str:
name = field["name"]
kind = field["kind"]
if kind == "ENUM":
field_type = th.StringType
return th.StringType
elif kind == "NON_NULL":
type_def = field.get("type", field)["ofType"]
field_type = self.extract_field_type(type_def)
return self.extract_field_type(type_def)
elif kind == "SCALAR":
field_type = type_mapping.get(name, th.StringType)
return type_mapping.get(name, th.StringType)
elif kind == "OBJECT":
obj_schema = self.extract_gql_schema(name)
properties = self.get_fields_schema(obj_schema["fields"])
if not properties:
return
field_type = th.ObjectType(*properties)
if properties:
return th.ObjectType(*properties)
elif kind == "LIST":
obj_type = field["ofType"]["ofType"]
list_field_type = self.extract_field_type(obj_type)
field_type = th.ArrayType(list_field_type)
else:
return
return field_type
if list_field_type:
return th.ArrayType(list_field_type)

def get_fields_schema(self, fields) -> dict:
"""Build the schema for the stream."""
Expand All @@ -99,17 +108,17 @@ def get_fields_schema(self, fields) -> dict:
# Ignore all the fields that need arguments
if field.get("args") or field.get("isDeprecated"):
continue
if field_name in self.recursive_objs:
if field_name in self.ignore_objs:
continue
if field["type"]["kind"] == "INTERFACE":
continue

if field["type"]["ofType"]:
type_def = field.get("type", field)["ofType"]
field_type = self.extract_field_type(type_def)
if field_type:
property = th.Property(field_name, field_type)
properties.append(property)
type_def = field.get("type", field)
type_def = type_def["ofType"] or type_def
field_type = self.extract_field_type(type_def)
if field_type:
property = th.Property(field_name, field_type)
properties.append(property)
return properties

def extract_gql_schema(self, gql_type):
Expand All @@ -118,9 +127,23 @@ def extract_gql_schema(self, gql_type):
schema_gen = (s for s in self.schema_gql if s["name"].lower() == gql_type_lw)
return next(schema_gen, None)

@cached_property
def catalog_dict(self):
"""Return the catalog for the stream."""
if getattr(self._tap, "input_catalog"):
catalog = self._tap.input_catalog.to_dict()
return catalog["streams"]
return {}

@cached_property
def schema(self) -> dict:
"""Return the schema for the stream."""
if getattr(self._tap, "input_catalog"):
streams = self.catalog_dict
stream = (s for s in streams if s["tap_stream_id"] == self.name)
stream_catalog = next(stream, None)
if stream_catalog:
return stream_catalog["schema"]
stream_type = self.extract_gql_schema(self.gql_type)
properties = self.get_fields_schema(stream_type["fields"])
return th.PropertiesList(*properties).to_dict()
Expand Down Expand Up @@ -167,88 +190,3 @@ def prepare_request_payload(
}
self.logger.debug(f"Attempting query:\n{query}")
return request_data

@property
def page_size(self) -> int:
"""Return the page size for the stream."""
if not self.available_points:
return 1
pages = self.available_points / self.query_cost
if pages < 5:
points_to_restore = self.max_points - self.available_points
sleep(points_to_restore // self.restore_rate - 1)
pages = (self.max_points - self.restore_rate) / self.query_cost
pages = pages - 1
elif self.query_cost and pages > 5:
if self.query_cost * pages >= 1000:
pages = math.floor(1000 / self.query_cost)
else:
pages = 250 if pages > 250 else pages
return int(pages)

@cached_property
def query(self) -> str:
"""Set or return the GraphQL query string."""
if not self.replication_key and not self.single_object_params:
base_query = simple_query
elif self.single_object_params:
base_query = simple_query_incremental
else:
base_query = query_incremental

query = base_query.replace("__query_name__", self.query_name)
query = query.replace("__selected_fields__", self.gql_selected_fields)

return query

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Any:
"""Return token identifying next page or None if all records have been read."""
if not self.replication_key:
return None
response_json = response.json()
has_next_json_path = f"$.data.{self.query_name}.pageInfo.hasNextPage"
has_next = next(extract_jsonpath(has_next_json_path, response_json))
if has_next:
cursor_json_path = f"$.data.{self.query_name}.edges[-1].cursor"
all_matches = extract_jsonpath(cursor_json_path, response_json)
return next(all_matches, None)
return None

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = {}
params["first"] = self.page_size
if next_page_token:
params["after"] = next_page_token
if self.replication_key:
start_date = self.get_starting_timestamp(context)
if start_date:
date = start_date.strftime("%Y-%m-%dT%H:%M:%S")
params["filter"] = f"updated_at:>{date}"
if self.single_object_params:
params = self.single_object_params
return params

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
if self.replication_key:
json_path = f"$.data.{self.query_name}.edges[*].node"
else:
json_path = f"$.data.{self.query_name}"
response = response.json()

if response.get("errors"):
raise Exception(response["errors"])

cost = response["extensions"].get("cost")
if not self.query_cost:
self.query_cost = cost.get("requestedQueryCost")
self.available_points = cost["throttleStatus"].get("currentlyAvailable")
self.restore_rate = cost["throttleStatus"].get("restoreRate")
self.max_points = cost["throttleStatus"].get("maximumAvailable")

yield from extract_jsonpath(json_path, input=response)
Loading

0 comments on commit 23624bf

Please sign in to comment.