void-packages/srcpkgs/python3-aioamqp/patches/02-pamqp-3.1.0.patch

470 lines
20 KiB
Diff

From bb59736dc0127f3bd5b632d0531058f7d98384f1 Mon Sep 17 00:00:00 2001
From: dzen <benoit.calvez@polyconseil.fr>
Date: Thu, 12 Mar 2020 08:32:29 +0100
Subject: [PATCH 2/2] Require pamqp 3.1.0
Combines upstream commits:
- 6de2e1a
- 95cb160
- 9f08434
---
aioamqp/channel.py | 114 +++++++++++++++++++-------------------
aioamqp/frame.py | 1 -
aioamqp/properties.py | 6 +-
aioamqp/protocol.py | 22 ++++----
aioamqp/tests/testcase.py | 2 +-
setup.py | 3 +-
6 files changed, 76 insertions(+), 72 deletions(-)
diff --git a/aioamqp/channel.py b/aioamqp/channel.py
index b58bd5a..b87392d 100644
--- a/aioamqp/channel.py
+++ b/aioamqp/channel.py
@@ -9,7 +9,7 @@ import io
from itertools import count
import warnings
-import pamqp.specification
+import pamqp.commands
from . import frame as amqp_frame
from . import exceptions
@@ -78,35 +78,35 @@ class Channel:
async def dispatch_frame(self, frame):
methods = {
- pamqp.specification.Channel.OpenOk.name: self.open_ok,
- pamqp.specification.Channel.FlowOk.name: self.flow_ok,
- pamqp.specification.Channel.CloseOk.name: self.close_ok,
- pamqp.specification.Channel.Close.name: self.server_channel_close,
-
- pamqp.specification.Exchange.DeclareOk.name: self.exchange_declare_ok,
- pamqp.specification.Exchange.BindOk.name: self.exchange_bind_ok,
- pamqp.specification.Exchange.UnbindOk.name: self.exchange_unbind_ok,
- pamqp.specification.Exchange.DeleteOk.name: self.exchange_delete_ok,
-
- pamqp.specification.Queue.DeclareOk.name: self.queue_declare_ok,
- pamqp.specification.Queue.DeleteOk.name: self.queue_delete_ok,
- pamqp.specification.Queue.BindOk.name: self.queue_bind_ok,
- pamqp.specification.Queue.UnbindOk.name: self.queue_unbind_ok,
- pamqp.specification.Queue.PurgeOk.name: self.queue_purge_ok,
-
- pamqp.specification.Basic.QosOk.name: self.basic_qos_ok,
- pamqp.specification.Basic.ConsumeOk.name: self.basic_consume_ok,
- pamqp.specification.Basic.CancelOk.name: self.basic_cancel_ok,
- pamqp.specification.Basic.GetOk.name: self.basic_get_ok,
- pamqp.specification.Basic.GetEmpty.name: self.basic_get_empty,
- pamqp.specification.Basic.Deliver.name: self.basic_deliver,
- pamqp.specification.Basic.Cancel.name: self.server_basic_cancel,
- pamqp.specification.Basic.Ack.name: self.basic_server_ack,
- pamqp.specification.Basic.Nack.name: self.basic_server_nack,
- pamqp.specification.Basic.RecoverOk.name: self.basic_recover_ok,
- pamqp.specification.Basic.Return.name: self.basic_return,
-
- pamqp.specification.Confirm.SelectOk.name: self.confirm_select_ok,
+ pamqp.commands.Channel.OpenOk.name: self.open_ok,
+ pamqp.commands.Channel.FlowOk.name: self.flow_ok,
+ pamqp.commands.Channel.CloseOk.name: self.close_ok,
+ pamqp.commands.Channel.Close.name: self.server_channel_close,
+
+ pamqp.commands.Exchange.DeclareOk.name: self.exchange_declare_ok,
+ pamqp.commands.Exchange.BindOk.name: self.exchange_bind_ok,
+ pamqp.commands.Exchange.UnbindOk.name: self.exchange_unbind_ok,
+ pamqp.commands.Exchange.DeleteOk.name: self.exchange_delete_ok,
+
+ pamqp.commands.Queue.DeclareOk.name: self.queue_declare_ok,
+ pamqp.commands.Queue.DeleteOk.name: self.queue_delete_ok,
+ pamqp.commands.Queue.BindOk.name: self.queue_bind_ok,
+ pamqp.commands.Queue.UnbindOk.name: self.queue_unbind_ok,
+ pamqp.commands.Queue.PurgeOk.name: self.queue_purge_ok,
+
+ pamqp.commands.Basic.QosOk.name: self.basic_qos_ok,
+ pamqp.commands.Basic.ConsumeOk.name: self.basic_consume_ok,
+ pamqp.commands.Basic.CancelOk.name: self.basic_cancel_ok,
+ pamqp.commands.Basic.GetOk.name: self.basic_get_ok,
+ pamqp.commands.Basic.GetEmpty.name: self.basic_get_empty,
+ pamqp.commands.Basic.Deliver.name: self.basic_deliver,
+ pamqp.commands.Basic.Cancel.name: self.server_basic_cancel,
+ pamqp.commands.Basic.Ack.name: self.basic_server_ack,
+ pamqp.commands.Basic.Nack.name: self.basic_server_nack,
+ pamqp.commands.Basic.RecoverOk.name: self.basic_recover_ok,
+ pamqp.commands.Basic.Return.name: self.basic_return,
+
+ pamqp.commands.Confirm.SelectOk.name: self.confirm_select_ok,
}
if frame.name not in methods:
@@ -144,7 +144,7 @@ class Channel:
async def open(self):
"""Open the channel on the server."""
- request = pamqp.specification.Channel.Open()
+ request = pamqp.commands.Channel.Open()
return (await self._write_frame_awaiting_response(
'open', self.channel_id, request, no_wait=False, check_open=False))
@@ -159,7 +159,7 @@ class Channel:
if not self.is_open:
raise exceptions.ChannelClosed("channel already closed or closing")
self.close_event.set()
- request = pamqp.specification.Channel.Close(reply_code, reply_text, class_id=0, method_id=0)
+ request = pamqp.commands.Channel.Close(reply_code, reply_text, class_id=0, method_id=0)
return (await self._write_frame_awaiting_response(
'close', self.channel_id, request, no_wait=False, check_open=False))
@@ -169,7 +169,7 @@ class Channel:
self.protocol.release_channel_id(self.channel_id)
async def _send_channel_close_ok(self):
- request = pamqp.specification.Channel.CloseOk()
+ request = pamqp.commands.Channel.CloseOk()
await self._write_frame(self.channel_id, request)
async def server_channel_close(self, frame):
@@ -183,7 +183,7 @@ class Channel:
self.connection_closed(results['reply_code'], results['reply_text'])
async def flow(self, active):
- request = pamqp.specification.Channel.Flow(active)
+ request = pamqp.commands.Channel.Flow(active)
return (await self._write_frame_awaiting_response(
'flow', self.channel_id, request, no_wait=False,
check_open=False))
@@ -201,7 +201,7 @@ class Channel:
async def exchange_declare(self, exchange_name, type_name, passive=False, durable=False,
auto_delete=False, no_wait=False, arguments=None):
- request = pamqp.specification.Exchange.Declare(
+ request = pamqp.commands.Exchange.Declare(
exchange=exchange_name,
exchange_type=type_name,
passive=passive,
@@ -222,7 +222,7 @@ class Channel:
return future
async def exchange_delete(self, exchange_name, if_unused=False, no_wait=False):
- request = pamqp.specification.Exchange.Delete(exchange=exchange_name, if_unused=if_unused, nowait=no_wait)
+ request = pamqp.commands.Exchange.Delete(exchange=exchange_name, if_unused=if_unused, nowait=no_wait)
return await self._write_frame_awaiting_response(
'exchange_delete', self.channel_id, request, no_wait)
@@ -235,7 +235,7 @@ class Channel:
no_wait=False, arguments=None):
if arguments is None:
arguments = {}
- request = pamqp.specification.Exchange.Bind(
+ request = pamqp.commands.Exchange.Bind(
destination=exchange_destination,
source=exchange_source,
routing_key=routing_key,
@@ -255,7 +255,7 @@ class Channel:
if arguments is None:
arguments = {}
- request = pamqp.specification.Exchange.Unbind(
+ request = pamqp.commands.Exchange.Unbind(
destination=exchange_destination,
source=exchange_source,
routing_key=routing_key,
@@ -297,7 +297,7 @@ class Channel:
if not queue_name:
queue_name = 'aioamqp.gen-' + str(uuid.uuid4())
- request = pamqp.specification.Queue.Declare(
+ request = pamqp.commands.Queue.Declare(
queue=queue_name,
passive=passive,
durable=durable,
@@ -327,7 +327,7 @@ class Channel:
if_empty: bool, the queue is deleted if it has no messages. Raise if not.
no_wait: bool, if set, the server will not respond to the method
"""
- request = pamqp.specification.Queue.Delete(
+ request = pamqp.commands.Queue.Delete(
queue=queue_name,
if_unused=if_unused,
if_empty=if_empty,
@@ -346,7 +346,7 @@ class Channel:
if arguments is None:
arguments = {}
- request = pamqp.specification.Queue.Bind(
+ request = pamqp.commands.Queue.Bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key,
@@ -367,7 +367,7 @@ class Channel:
if arguments is None:
arguments = {}
- request = pamqp.specification.Queue.Unbind(
+ request = pamqp.commands.Queue.Unbind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key,
@@ -383,7 +383,7 @@ class Channel:
logger.debug("Queue unbound")
async def queue_purge(self, queue_name, no_wait=False):
- request = pamqp.specification.Queue.Purge(
+ request = pamqp.commands.Queue.Purge(
queue=queue_name, nowait=no_wait
)
return (await self._write_frame_awaiting_response(
@@ -406,7 +406,7 @@ class Channel:
if properties is None:
properties = {}
- method_request = pamqp.specification.Basic.Publish(
+ method_request = pamqp.commands.Basic.Publish(
exchange=exchange_name,
routing_key=routing_key,
mandatory=mandatory,
@@ -417,7 +417,7 @@ class Channel:
header_request = pamqp.header.ContentHeader(
body_size=len(payload),
- properties=pamqp.specification.Basic.Properties(**properties)
+ properties=pamqp.commands.Basic.Properties(**properties)
)
await self._write_frame(self.channel_id, header_request, drain=False)
@@ -446,7 +446,7 @@ class Channel:
settings should apply per-consumer channel; and global=true to mean
that the QoS settings should apply per-channel.
"""
- request = pamqp.specification.Basic.Qos(
+ request = pamqp.commands.Basic.Qos(
prefetch_size, prefetch_count, connection_global
)
return (await self._write_frame_awaiting_response(
@@ -490,7 +490,7 @@ class Channel:
if arguments is None:
arguments = {}
- request = pamqp.specification.Basic.Consume(
+ request = pamqp.commands.Basic.Consume(
queue=queue_name,
consumer_tag=consumer_tag,
no_local=no_local,
@@ -561,7 +561,7 @@ class Channel:
callback, error)
async def basic_cancel(self, consumer_tag, no_wait=False):
- request = pamqp.specification.Basic.Cancel(consumer_tag, no_wait)
+ request = pamqp.commands.Basic.Cancel(consumer_tag, no_wait)
return (await self._write_frame_awaiting_response(
'basic_cancel', self.channel_id, request, no_wait=no_wait)
)
@@ -575,7 +575,7 @@ class Channel:
logger.debug("Cancel ok")
async def basic_get(self, queue_name='', no_ack=False):
- request = pamqp.specification.Basic.Get(queue=queue_name, no_ack=no_ack)
+ request = pamqp.commands.Basic.Get(queue=queue_name, no_ack=no_ack)
return (await self._write_frame_awaiting_response(
'basic_get', self.channel_id, request, no_wait=False)
)
@@ -606,11 +606,11 @@ class Channel:
future.set_exception(exceptions.EmptyQueue)
async def basic_client_ack(self, delivery_tag, multiple=False):
- request = pamqp.specification.Basic.Ack(delivery_tag, multiple)
+ request = pamqp.commands.Basic.Ack(delivery_tag, multiple)
await self._write_frame(self.channel_id, request)
async def basic_client_nack(self, delivery_tag, multiple=False, requeue=True):
- request = pamqp.specification.Basic.Nack(delivery_tag, multiple, requeue)
+ request = pamqp.commands.Basic.Nack(delivery_tag, multiple, requeue)
await self._write_frame(self.channel_id, request)
async def basic_server_ack(self, frame):
@@ -620,15 +620,15 @@ class Channel:
fut.set_result(True)
async def basic_reject(self, delivery_tag, requeue=False):
- request = pamqp.specification.Basic.Reject(delivery_tag, requeue)
+ request = pamqp.commands.Basic.Reject(delivery_tag, requeue)
await self._write_frame(self.channel_id, request)
async def basic_recover_async(self, requeue=True):
- request = pamqp.specification.Basic.RecoverAsync(requeue)
+ request = pamqp.commands.Basic.RecoverAsync(requeue)
await self._write_frame(self.channel_id, request)
async def basic_recover(self, requeue=True):
- request = pamqp.specification.Basic.Recover(requeue)
+ request = pamqp.commands.Basic.Recover(requeue)
return (await self._write_frame_awaiting_response(
'basic_recover', self.channel_id, request, no_wait=False)
)
@@ -681,7 +681,7 @@ class Channel:
delivery_tag = next(self.delivery_tag_iter) # pylint: disable=stop-iteration-return
fut = self._set_waiter('basic_server_ack_{}'.format(delivery_tag))
- method_request = pamqp.specification.Basic.Publish(
+ method_request = pamqp.commands.Basic.Publish(
exchange=exchange_name,
routing_key=routing_key,
mandatory=mandatory,
@@ -689,7 +689,7 @@ class Channel:
)
await self._write_frame(self.channel_id, method_request, drain=False)
- properties = pamqp.specification.Basic.Properties(**properties)
+ properties = pamqp.commands.Basic.Properties(**properties)
header_request = pamqp.header.ContentHeader(
body_size=len(payload), properties=properties
)
@@ -710,7 +710,7 @@ class Channel:
async def confirm_select(self, *, no_wait=False):
if self.publisher_confirms:
raise ValueError('publisher confirms already enabled')
- request = pamqp.specification.Confirm.Select(nowait=no_wait)
+ request = pamqp.commands.Confirm.Select(nowait=no_wait)
return (await self._write_frame_awaiting_response(
'confirm_select', self.channel_id, request, no_wait)
diff --git a/aioamqp/frame.py b/aioamqp/frame.py
index d70cfd7..af27ab5 100644
--- a/aioamqp/frame.py
+++ b/aioamqp/frame.py
@@ -42,7 +42,6 @@ import asyncio
import socket
import pamqp.encode
-import pamqp.specification
import pamqp.frame
from . import exceptions
diff --git a/aioamqp/properties.py b/aioamqp/properties.py
index 56a3484..4232040 100644
--- a/aioamqp/properties.py
+++ b/aioamqp/properties.py
@@ -1,4 +1,6 @@
# pylint: disable=redefined-builtin
+import datetime
+
from .constants import MESSAGE_PROPERTIES
@@ -37,7 +39,9 @@ def from_pamqp(instance):
props.reply_to = instance.reply_to
props.expiration = instance.expiration
props.message_id = instance.message_id
- props.timestamp = instance.timestamp
+ if instance.timestamp is not None:
+ # pamqp uses naive datetimes representing UTC, let's use TZ-aware datetimes
+ props.timestamp = instance.timestamp.replace(tzinfo=datetime.timezone.utc)
props.message_type = instance.message_type
props.user_id = instance.user_id
props.app_id = instance.app_id
diff --git a/aioamqp/protocol.py b/aioamqp/protocol.py
index 4938833..6465400 100644
--- a/aioamqp/protocol.py
+++ b/aioamqp/protocol.py
@@ -5,9 +5,9 @@
import asyncio
import logging
+import pamqp.commands
import pamqp.frame
import pamqp.heartbeat
-import pamqp.specification
from . import channel as amqp_channel
from . import constants as amqp_constants
@@ -159,7 +159,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
"""Close connection (and all channels)"""
await self.ensure_open()
self.state = CLOSING
- request = pamqp.specification.Connection.Close(
+ request = pamqp.commands.Connection.Close(
reply_code=0,
reply_text='',
class_id=0,
@@ -251,11 +251,11 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
"""Dispatch the received frame to the corresponding handler"""
method_dispatch = {
- pamqp.specification.Connection.Close.name: self.server_close,
- pamqp.specification.Connection.CloseOk.name: self.close_ok,
- pamqp.specification.Connection.Tune.name: self.tune,
- pamqp.specification.Connection.Start.name: self.start,
- pamqp.specification.Connection.OpenOk.name: self.open_ok,
+ pamqp.commands.Connection.Close.name: self.server_close,
+ pamqp.commands.Connection.CloseOk.name: self.close_ok,
+ pamqp.commands.Connection.Tune.name: self.tune,
+ pamqp.commands.Connection.Start.name: self.start,
+ pamqp.commands.Connection.OpenOk.name: self.open_ok,
}
if frame_channel is None and frame is None:
frame_channel, frame = await self.get_frame()
@@ -392,7 +392,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
def credentials():
return '\0{LOGIN}\0{PASSWORD}'.format(**auth)
- request = pamqp.specification.Connection.StartOk(
+ request = pamqp.commands.Connection.StartOk(
client_properties=client_properties,
mechanism=mechanism,
locale=locale,
@@ -420,7 +420,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
self._stream_writer.close()
async def _close_ok(self):
- request = pamqp.specification.Connection.CloseOk()
+ request = pamqp.commands.Connection.CloseOk()
await self._write_frame(0, request)
async def tune(self, frame):
@@ -429,7 +429,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
self.server_heartbeat = frame.heartbeat
async def tune_ok(self, channel_max, frame_max, heartbeat):
- request = pamqp.specification.Connection.TuneOk(
+ request = pamqp.commands.Connection.TuneOk(
channel_max, frame_max, heartbeat
)
await self._write_frame(0, request)
@@ -439,7 +439,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol):
async def open(self, virtual_host, capabilities='', insist=False):
"""Open connection to virtual host."""
- request = pamqp.specification.Connection.Open(
+ request = pamqp.commands.Connection.Open(
virtual_host, capabilities, insist
)
await self._write_frame(0, request)
diff --git a/aioamqp/tests/testcase.py b/aioamqp/tests/testcase.py
index 120104b..d6d702b 100644
--- a/aioamqp/tests/testcase.py
+++ b/aioamqp/tests/testcase.py
@@ -147,7 +147,7 @@ class RabbitTestCaseMixin:
if amqp is None:
amqp = self.amqp
- server_version = tuple(int(x) for x in amqp.server_properties['version'].decode().split('.'))
+ server_version = tuple(int(x) for x in amqp.server_properties['version'].split('.'))
return server_version
async def check_exchange_exists(self, exchange_name):
diff --git a/setup.py b/setup.py
index ea0e79f..1d0e923 100644
--- a/setup.py
+++ b/setup.py
@@ -25,8 +25,9 @@ setuptools.setup(
'aioamqp',
],
install_requires=[
- 'pamqp>=2.2.0,<3',
+ 'pamqp>=3.1.0',
],
+ python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
--
2.34.1