Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost data for subscription #42

Open
hashbash opened this issue Jul 16, 2020 · 0 comments
Open

Lost data for subscription #42

hashbash opened this issue Jul 16, 2020 · 0 comments

Comments

@hashbash
Copy link

Hello!
I'm new to nats/stan and sometimes I lost part of messages from NATS.
Approximative 10 minutes of data every day (messages in 10-minute interval).
I'm using this python code as systemd service.

Any idea why it happens?

import traceback
import signal
import asyncio
import logging
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime

from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN


NATS_SERVERS = [
    "nats:https://positionaq1.dp.some_server.ru:4222",
    "nats:https://positionaq2.dp.some_server.ru:4222",
]

NATS_SUBJECT = 'positions.changes_by_events'
NATS_CLUSTER_ID = 'gb_analytics'
NATS_CLIENT_ID = 'olap-orr-nats-listener-service'
NATS_CONNECT_TIMEOUT = 20
NATS_MAX_RECONNECT_ATTEMPTS = 5


nats_data_path = '/opt/nats_data/dp/nats.data'
nats_logs_path = '/var/log/nats/dp/nats.log'


def setup_logger(name, log_path, log_format, when, interval=1, level=logging.INFO):
    handler = TimedRotatingFileHandler(log_path, when=when, interval=interval)
    formatter = logging.Formatter(log_format)
    handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)

    return logger


base_logger = setup_logger(name='Nats logger', log_path=nats_logs_path, when='midnight', interval=356,
                           log_format='%(asctime)s %(name)s %(levelname)s %(message)s')
data_writer = setup_logger(name='Data writer', log_path=nats_data_path, when='m', interval=5,
                           log_format='%(message)s')


async def run(loop):
    nc = NATS()
    sc = STAN()

    async def closed_cb():
        print("Connection to NATS is closed.")
        await asyncio.sleep(0.1, loop=loop)
        loop.stop()

    async def reconnected_cb():
        print(f"Reconnected to NATS at {nc.connected_url.netloc}...")

    try:
        await nc.connect(servers=NATS_SERVERS, io_loop=loop, connect_timeout=NATS_CONNECT_TIMEOUT,
                         closed_cb=closed_cb, reconnected_cb=reconnected_cb,
                         max_reconnect_attempts=NATS_MAX_RECONNECT_ATTEMPTS)
        await sc.connect(NATS_CLUSTER_ID, NATS_CLIENT_ID, nats=nc, connect_timeout=NATS_CONNECT_TIMEOUT)
    except Exception:
        print(traceback.format_exc())
        loop.stop()

    print(f"Connected to NATS at {nc.connected_url.netloc}")

    def signal_handler():
        if nc.is_closed:
            return
        print("Disconnecting...")
        loop.create_task(nc.close())

    for sig in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, sig), signal_handler)

    async def cb(msg):
        print(msg)  # available message meta info from journalctl
        base_logger.info('Message received. sequence: %s. time: %s. data_len: %d'
                         % (msg.seq, datetime.now().isoformat(), len(msg.data)))
        data_writer.info("""{"sequence": "%s", "time": "%s", "data": "%s"}"""
                         % (msg.seq, datetime.now().isoformat(), msg.data))

    await sc.subscribe(NATS_SUBJECT, cb=cb)


def exception_handler(loop, context):
    loop.default_exception_handler(context)
    exc = context.get('exception')
    if isinstance(exc, (asyncio.TimeoutError, asyncio.CancelledError)):
        print(context)
        loop.stop()


if __name__ == '__main__':
    base_logger.info('Run nats subscriber...')
    base_logger.info('Nats params: NATS_SUBJECT=%s, NATS_CLUSTER_ID=%s, NATS_CLIENT_ID=%s, NATS_CONNECT_TIMEOUT=%d, '
                     'NATS_SERVERS=%s' % (NATS_SUBJECT, NATS_CLUSTER_ID, NATS_CLIENT_ID,
                                          NATS_CONNECT_TIMEOUT, NATS_SERVERS))
    main_loop = asyncio.get_event_loop()
    main_loop.set_exception_handler(exception_handler)
    main_loop.run_until_complete(run(main_loop))
    try:
        main_loop.run_forever()
    finally:
        main_loop.close()

and service file:

[Unit]
Description=Nats dp subscriber daemon
After=network.target

[Service]
Type=simple
WorkingDirectory=/root/orr-nats-subscriber/dp
ExecStart=/usr/bin/python3 /root/orr-nats-subscriber/dp/nats_subscriber.py
#ExecReload=/bin/kill -s SIGHUP $MAINPID
ExecStop=/bin/kill -s SIGTERM $MAINPID
#Restart=on-failure
Restart=always
RestartSec=5s

[Install]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant