Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: support zerocopysend asgi extension #20

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion asgi_webdav/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class Config(BaseModel):
compression: Compression = Compression()
cors: CORS = CORS()
enable_dir_browser: bool = True

enable_asgi_zero_copy: bool = False
# other
logging: Logging = Logging()
sentry_dsn: str | None = None
Expand Down
36 changes: 36 additions & 0 deletions asgi_webdav/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import functools
import hashlib
import os
import re
import xml.parsers.expat
from collections.abc import AsyncGenerator, Callable
from logging import getLogger
from mimetypes import guess_type as orig_guess_type
from pathlib import Path
from typing import Any, TypeVar

import aiofiles
import xmltodict
Expand All @@ -14,6 +18,7 @@
from asgi_webdav.constants import RESPONSE_DATA_BLOCK_SIZE

logger = getLogger(__name__)
T = TypeVar("T")


async def receive_all_data_in_one_call(receive: Callable) -> bytes:
Expand Down Expand Up @@ -154,3 +159,34 @@ def dav_xml2dict(data: bytes) -> dict | None:
return None

return data


async def run_in_threadpool(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, functools.partial(func, *args, **kwargs))


async def iter_fd(
file: int, offset: int = None, count: int = None
) -> AsyncGenerator[bytes]:
if offset is not None:
await run_in_threadpool(os.lseek, file, offset, os.SEEK_SET)
here = 0
should_stop = False

if count is None:
length = RESPONSE_DATA_BLOCK_SIZE
while not should_stop:
data = await run_in_threadpool(os.read, file, length)
yield data
if len(data) < length:
should_stop = True
else:
while not should_stop:
length = min(RESPONSE_DATA_BLOCK_SIZE, count - here)
should_stop = length == count - here
here += length
data = await run_in_threadpool(os.read, file, length)
# await send({"type": "http.response.body", "body": data,
# "more_body": more_body if should_stop else True})
yield data
10 changes: 7 additions & 3 deletions asgi_webdav/provider/dev_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from asgi_webdav.lock import DAVLock
from asgi_webdav.property import DAVProperty, DAVPropertyBasicData
from asgi_webdav.request import DAVRequest
from asgi_webdav.response import DAVResponse, DAVResponseType
from asgi_webdav.response import DAVResponse, DAVResponseType, DAVZeroCopySendData

logger = getLogger(__name__)

Expand Down Expand Up @@ -465,12 +465,16 @@ async def _do_mkcol(self, request: DAVRequest) -> int:

async def do_get(
self, request: DAVRequest
) -> tuple[int, DAVPropertyBasicData | None, AsyncGenerator | None]:
) -> tuple[
int, DAVPropertyBasicData | None, DAVZeroCopySendData | AsyncGenerator | None
]:
return await self._do_get(request)

async def _do_get(
self, request: DAVRequest
) -> tuple[int, DAVPropertyBasicData | None, AsyncGenerator | None]:
) -> tuple[
int, DAVPropertyBasicData | None, DAVZeroCopySendData | AsyncGenerator | None
]:
# 404, None, None
# 200, DAVPropertyBasicData, None # is_dir
# 200/206, DAVPropertyBasicData, AsyncGenerator # is_file
Expand Down
58 changes: 54 additions & 4 deletions asgi_webdav/provider/file_system.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import json
import os
import shutil
from collections.abc import AsyncGenerator
from logging import getLogger
from pathlib import Path
from stat import S_ISDIR
from typing import Union

import aiofiles
import aiofiles.os
import aiofiles.ospath

from asgi_webdav.config import get_config
from asgi_webdav.constants import (
RESPONSE_DATA_BLOCK_SIZE,
DAVDepth,
Expand All @@ -22,6 +25,7 @@
from asgi_webdav.property import DAVProperty, DAVPropertyBasicData
from asgi_webdav.provider.dev_provider import DAVProvider
from asgi_webdav.request import DAVRequest
from asgi_webdav.response import DAVResponse, DAVZeroCopySendData

logger = getLogger(__name__)

Expand Down Expand Up @@ -139,6 +143,29 @@ async def _dav_response_data_generator(
yield data, more_body


if os.name == "nt": # pragma: py-no-win32

async def open_for_sendfile(
path: Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
) -> int:
return await run_in_threadpool(os.open, path, os.O_RDONLY | os.O_BINARY)

else: # pragma: py-win32

async def open_for_sendfile(
path: Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
) -> int:
return await run_in_threadpool(os.open, path, os.O_RDONLY)


def can_zerocopysend(header: dict[bytes, bytes]) -> bool:
config = get_config()
return config.enable_asgi_zero_copy and not DAVResponse.can_be_compressed(
header.get(b"Content-Type", b"").decode("utf-8"),
config.compression.content_type_user_rule,
)


class FileSystemProvider(DAVProvider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -330,7 +357,9 @@ async def _do_mkcol(self, request: DAVRequest) -> int:

async def _do_get(
self, request: DAVRequest
) -> tuple[int, DAVPropertyBasicData | None, AsyncGenerator | None]:
) -> tuple[
int, DAVPropertyBasicData | None, DAVZeroCopySendData | AsyncGenerator | None
]:
fs_path = self._get_fs_path(request.dist_src_path, request.user.username)
if not await aiofiles.ospath.exists(fs_path):
return 404, None, None
Expand All @@ -342,17 +371,38 @@ async def _do_get(
if fs_path.is_dir():
return 200, dav_property.basic_data, None

# is fd
if can_zerocopysend(dav_property.basic_data.get_get_head_response_headers()):
file = await open_for_sendfile(fs_path)
if request.content_range:
data = DAVZeroCopySendData(
file=file,
offset=request.content_range_start,
count=request.content_range_end - request.content_range_start,
)
http_status = 206
else:
data = DAVZeroCopySendData(file=file)
http_status = 200
# type is file
if request.content_range:
elif request.content_range:
data = _dav_response_data_generator(
fs_path,
content_range_start=request.content_range_start,
content_range_end=request.content_range_end,
)
http_status = 206
else:
data = _dav_response_data_generator(fs_path)
http_status = 200
if request.content_range:
data = _dav_response_data_generator(
fs_path,
content_range_start=request.content_range_start,
content_range_end=request.content_range_end,
)
http_status = 206
else:
data = _dav_response_data_generator(fs_path)
http_status = 200

return http_status, dav_property.basic_data, data

Expand Down
Loading