GitXplorerGitXplorer
a

msgapp

public
3 stars
0 forks
2 issues

Commits

List of commits on branch main.
Verified
a711c82a5a0344872eced280402cd4695d7c845a

session scope and fail fast

aadriangb committed 2 years ago
Verified
bf2ac1a8abde132780435c54a5dfa6c3363f59cd

fix ack deadlines extension for pub/sub

aadriangb committed 2 years ago
Verified
f620fd88a6dafa658761e1036bfe687906577204

support stringified annotations

aadriangb committed 2 years ago
Verified
993a9154ce6ce92576820a57e8384fadcca14585

extend timeout for tests

aadriangb committed 2 years ago
Verified
fdf9adda4ae82ec9a65b3a2390e1df0c3b66790c

lower required permissions for pubsub

aadriangb committed 2 years ago
Verified
a51f0c038803a7dd056590b154f0577902f450a7

add stub file

aadriangb committed 2 years ago

README

The README file for this repository.

msgapp: declarative message driven applications

msgapp helps you write event consuming applications with minimal boilerplate. It abstracts away some of the fiddly details of dealing with messaging queues like acks, deadlines and parsing. The design is focused on flexibility and testability, offering the ability to swap out event backends (currently only PubSub) and support multiple parsers (only JSON via Pydantic is supplied out of the box for now).

Examples

Pydantic + PubSub

import anyio
from pydantic import BaseModel
from msgapp import App
from msgapp.producers.pubsub import PubSubQueue
from msgapp.parsers.json import PydanticParserFactory

class MyModel(BaseModel):
    foo: str
    baz: int

async def handler(model: MyModel) -> None:
    # do something with the model
    print(model)
    # return to ack/consume the model
    # raise an exception to signal an error
    # and let the queue handle redelivery

app = App(
    handler,
    producer=PubSubQueue(subscription="projects/demo/subscriptions/foo-bar"),
    parser=PydanticParserFactory(),
)

anyio.run(app.run)

Redis Streams + Pydantic

We do not include a Redis implementation simply because there are many ways that redis can be used for messaging. For example, you may use Redis' PubSub functionality for fire and forget messaging or Streams for reliable Kafka-like operation.

Below is an example implementation using Redis streams. While this may not be exactly the implementation you want, it should give you some idea of how to write a Redis producer.

from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
    Any,
    AsyncContextManager,
    AsyncIterator,
    Mapping,
    Optional,
    Sequence,
    Tuple,
)

from redis.asyncio import Redis
from redis.exceptions import ResponseError

from msgapp._producer import Producer


@dataclass(frozen=True)
class RedisMessage:
    payload: Mapping[bytes, bytes]
    id: bytes


class RedisWrappedEnvelope:
    def __init__(self, message: RedisMessage, body: bytes) -> None:
        self._message = message
        self._body = body

    @property
    def body(self) -> bytes:
        return self._body

    @property
    def message(self) -> RedisMessage:
        return self._message


class RedisProducer(Producer[Any]):
    def __init__(
        self,
        client: "Redis[Any]",
        stream: str,
        group: str,
        message_key: bytes,
        consumer_name: str,
        batch_size: int = 10,
        poll_interval: int = 30,
    ) -> None:
        self._client = client
        self._stream = stream
        self._group = group
        self._batch_size = batch_size
        self._poll_interval = poll_interval
        self._message_key = message_key
        self._consumer_name = consumer_name

    async def pull(self) -> AsyncIterator[AsyncContextManager[RedisWrappedEnvelope]]:
        try:
            await self._client.xgroup_create(
                name=self._stream, groupname=self._group, mkstream=True
            )
        except ResponseError as e:
            if "Consumer Group name already exists" in e.args[0]:
                pass
            else:
                raise
        last_id: Optional[bytes] = None
        items: Optional[
            Sequence[Tuple[str, Sequence[Tuple[bytes, Mapping[bytes, bytes]]]]]
        ] = None
        while True:
            items = await self._client.xreadgroup(
                groupname=self._group,
                consumername=self._consumer_name,
                streams={self._stream: last_id or ">"},
                block=1,
                count=1,
            )
            if not items:
                continue
            stream_items = next(iter(items))
            if len(stream_items[1]) == 0:
                last_id = None
                continue
            _, stream_messages = stream_items
            for message_id, values in stream_messages:
                last_id = message_id

                wrapped_msg = RedisMessage(payload=values, id=message_id)
                wrapped_envelope = RedisWrappedEnvelope(
                    wrapped_msg, values[self._message_key]
                )

                @asynccontextmanager
                async def msg_wrapper(
                    envelope: RedisWrappedEnvelope = wrapped_envelope,
                ) -> AsyncIterator[RedisWrappedEnvelope]:
                    yield envelope
                    await self._client.xack(  # type: ignore
                        self._stream, self._group, envelope.message.id
                    )

                yield msg_wrapper()


if __name__ == "__main__":
    import anyio
    from pydantic import BaseModel

    from msgapp import App
    from msgapp.parsers.json import PydanticParserFactory

    class MyModel(BaseModel):
        foo: str

    async def handler(message: MyModel) -> None:
        print(repr(message))

    stream = "mystream"  # str(uuid4())

    async def main() -> None:
        client = Redis.from_url("redis://localhost")
        producer = RedisProducer(client, stream, "mygroup", b"message", "consumer")

        app = App(handler, parser=PydanticParserFactory(), producer=producer)

        async with anyio.create_task_group() as tg:
            tg.start_soon(app.run)
            await client.xadd(stream, {b"message": b'{"foo": "bar"}'})
            await client.xadd(stream, {b"message": b'{"foo": "baz"}'})
            await anyio.sleep(1)
            tg.cancel_scope.cancel()

    anyio.run(main)