Skip to content

Commit

Permalink
Improved codebase. Isolated modules (aiohttp, gevent) so they don’t c…
Browse files Browse the repository at this point in the history
…ollide
  • Loading branch information
syrusakbary committed Nov 14, 2017
1 parent b3e5459 commit 927ca27
Show file tree
Hide file tree
Showing 21 changed files with 307 additions and 747 deletions.
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ python aio.py

## Setup

For setting up, just plug into your AioHTTP server.
### aiohttp

For setting up, just plug into your aiohttp server.

```python
subscription_server = WebSocketSubscriptionServer(schema)
from graphql_ws.aiohttp import AiohttpSubscriptionServer


subscription_server = AiohttpSubscriptionServer(schema)

async def subscriptions(request):
ws = web.WebSocketResponse(protocols=('graphql-ws',))
Expand Down Expand Up @@ -63,3 +68,41 @@ class Subscription(graphene.ObjectType):

schema = graphene.Schema(query=Query, subscription=Subscription)
```


### Gevent

For setting up, just plug into your Gevent server.

```python
subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'

@sockets.route('/subscriptions')
def echo_socket(ws):
subscription_server.handle(ws)
return []
```

And then, plug into a subscribable schema:

```python
import graphene
from rx import Observable


class Query(graphene.ObjectType):
base = graphene.String()


class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())

async def resolve_count_seconds(root, info, up_to=5):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)


schema = graphene.Schema(query=Query, subscription=Subscription)
```
File renamed without changes.
25 changes: 7 additions & 18 deletions examples/aio.py → examples/aiohttp/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from aiohttp import web, WSMsgType
from template import render_graphiql
import json

from aiohttp import web
from schema import schema
from graphql import format_error
import json
from graphql_ws import WebSocketSubscriptionServer
from graphql_ws.aiohttp import AiohttpSubscriptionServer

from template import render_graphiql


async def graphql_view(request):
Expand All @@ -21,27 +23,14 @@ async def graphql_view(request):
async def graphiql_view(request):
return web.Response(text=render_graphiql(), headers={'Content-Type': 'text/html'})

subscription_server = WebSocketSubscriptionServer(schema)
subscription_server = AiohttpSubscriptionServer(schema)


async def subscriptions(request):
ws = web.WebSocketResponse(protocols=('graphql-ws',))
await ws.prepare(request)

await subscription_server.handle(ws)

# async for msg in ws:
# if msg.type == WSMsgType.TEXT:
# if msg.data == 'close':
# await ws.close()
# else:
# await ws.send_str(msg.data + '/answer')
# elif msg.type == WSMsgType.ERROR:
# print('ws connection closed with exception %s' %
# ws.exception())

# print('websocket connection closed')

return ws


Expand Down
3 changes: 3 additions & 0 deletions examples/aiohttp/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
graphql_ws
aiohttp>=2.1.0
graphene>=2.0
34 changes: 34 additions & 0 deletions examples/aiohttp/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import random
import asyncio
import graphene


class Query(graphene.ObjectType):
base = graphene.String()


class RandomType(graphene.ObjectType):
seconds = graphene.Int()
random_int = graphene.Int()


class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
random_int = graphene.Field(RandomType)

async def resolve_count_seconds(root, info, up_to=5):
for i in range(up_to):
print("YIELD SECOND", i)
yield i
await asyncio.sleep(1.)
yield up_to

async def resolve_random_int(root, info):
i = 0
while True:
yield RandomType(seconds=i, random_int=random.randint(0, 500))
await asyncio.sleep(1.)
i += 1


schema = graphene.Schema(query=Query, subscription=Subscription)
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,43 +1,13 @@
import json

import graphene
from flask import Flask, make_response
from flask_graphql import GraphQLView
from flask_sockets import Sockets
from rx import Observable

from graphql_ws import GeventSubscriptionServer
from template import render_graphiql


class Query(graphene.ObjectType):
base = graphene.String()


class RandomType(graphene.ObjectType):
seconds = graphene.Int()
random_int = graphene.Int()


class Subscription(graphene.ObjectType):

count_seconds = graphene.Int(up_to=graphene.Int())

random_int = graphene.Field(RandomType)


def resolve_count_seconds(root, info, up_to):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)

def resolve_random_int(root, info):
import random
return Observable.interval(1000).map(lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))

schema = graphene.Schema(query=Query, subscription=Subscription)

from graphql_ws.gevent import GeventSubscriptionServer

from schema import schema
from template import render_graphiql

app = Flask(__name__)
app.debug = True
Expand All @@ -48,12 +18,14 @@ def resolve_random_int(root, info):
def graphql_view():
return make_response(render_graphiql())


app.add_url_rule(
'/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=False))

subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'


@sockets.route('/subscriptions')
def echo_socket(ws):
subscription_server.handle(ws)
Expand Down
6 changes: 6 additions & 0 deletions examples/flask_gevent/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
graphql_ws
gevent
flask
flask_graphql
flask_sockets
graphene>=2.0
30 changes: 30 additions & 0 deletions examples/flask_gevent/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import random
import graphene
from rx import Observable


class Query(graphene.ObjectType):
base = graphene.String()


class RandomType(graphene.ObjectType):
seconds = graphene.Int()
random_int = graphene.Int()


class Subscription(graphene.ObjectType):

count_seconds = graphene.Int(up_to=graphene.Int())

random_int = graphene.Field(RandomType)

def resolve_count_seconds(root, info, up_to=5):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)

def resolve_random_int(root, info):
return Observable.interval(1000).map(lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))


schema = graphene.Schema(query=Query, subscription=Subscription)
2 changes: 1 addition & 1 deletion examples/flask_gevent/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def render_graphiql():
</script>
</body>
</html>''').substitute(
GRAPHIQL_VERSION='0.10.2',
GRAPHIQL_VERSION='0.11.7',
SUBSCRIPTIONS_TRANSPORT_VERSION='0.7.0',
subscriptionsEndpoint='ws:https://localhost:5000/subscriptions',
# subscriptionsEndpoint='ws:https://localhost:5000/',
Expand Down
20 changes: 0 additions & 20 deletions examples/schema.py

This file was deleted.

1 change: 0 additions & 1 deletion examples/src/graphql
Submodule graphql deleted from ebcd7f
5 changes: 2 additions & 3 deletions graphql_ws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# -*- coding: utf-8 -*-

"""Top-level package for GraphQL AioWS."""
"""Top-level package for GraphQL WS."""

__author__ = """Syrus Akbary"""
__email__ = '[email protected]'
__version__ = '0.1.0'


from .observable_aiter import setup_observable_extension
from .server import WebSocketSubscriptionServer
from .gevent_server import GeventSubscriptionServer
from .base import BaseConnectionContext, BaseSubscriptionServer

setup_observable_extension()
102 changes: 102 additions & 0 deletions graphql_ws/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from inspect import isawaitable, isasyncgen

from asyncio import ensure_future
from aiohttp import WSMsgType
from graphql.execution.executors.asyncio import AsyncioExecutor

from .base import ConnectionClosedException, BaseConnectionContext, BaseSubscriptionServer

from .constants import (
GQL_CONNECTION_ACK,
GQL_CONNECTION_ERROR,
GQL_COMPLETE
)


class AiohttpConnectionContext(BaseConnectionContext):
async def receive(self):
msg = await self.ws.receive()
if msg.type == WSMsgType.TEXT:
return msg.data
elif msg.type == WSMsgType.ERROR:
raise ConnectionClosedException()

async def send(self, data):
if self.closed:
return
await self.ws.send_str(data)

@property
def closed(self):
return self.ws.closed

async def close(self, code):
await self.ws.close(code)


class AiohttpSubscriptionServer(BaseSubscriptionServer):

def get_graphql_params(self, *args, **kwargs):
params = super(AiohttpSubscriptionServer,
self).get_graphql_params(*args, **kwargs)
return dict(params, executor=AsyncioExecutor())

async def handle(self, ws):
connection_context = AiohttpConnectionContext(ws)
await self.on_open(connection_context)
while True:
try:
if connection_context.closed:
raise ConnectionClosedException()
message = await connection_context.receive()
except ConnectionClosedException:
self.on_close(connection_context)
return

ensure_future(self.on_message(connection_context, message))

async def on_open(self, connection_context):
pass

def on_close(self, connection_context):
remove_operations = list(connection_context.operations.keys())
for op_id in remove_operations:
self.unsubscribe(connection_context, op_id)

async def on_connect(self, connection_context, payload):
pass

async def on_connection_init(self, connection_context, op_id, payload):
try:
await self.on_connect(connection_context, payload)
await self.send_message(connection_context, op_type=GQL_CONNECTION_ACK)

# if self.keep_alive:
# await self.send_message(connection_context,
# op_type=GQL_CONNECTION_KEEP_ALIVE)
except Exception as e:
await self.send_error(connection_context, op_id, e, GQL_CONNECTION_ERROR)
await connection_context.close(1011)

async def on_connection_terminate(self, connection_context, op_id):
await connection_context.close(1011)

async def on_start(self, connection_context, op_id, params):
execution_result = self.execute(return_promise=True, **params)

if isawaitable(execution_result):
execution_result = await execution_result

if not hasattr(execution_result, '__aiter__'):
await self.send_execution_result(connection_context, op_id, execution_result)
else:
iterator = await execution_result.__aiter__()
connection_context.register_operation(op_id, iterator)
async for single_result in iterator:
if not connection_context.has_operation(op_id):
break
await self.send_execution_result(connection_context, op_id, single_result)
await self.send_message(connection_context, op_id, GQL_COMPLETE)

async def on_stop(self, connection_context, op_id):
self.unsubscribe(connection_context, op_id)
Loading

0 comments on commit 927ca27

Please sign in to comment.