From 77503b366f6e00cc043f599d2d29f9b9971a57b1 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 27 Feb 2026 10:42:39 +0100 Subject: [PATCH 1/2] Fix RabbitMQ queue binding and Gateway GetOriginalStream RabbitMQ: QueueBind used SubscriptionId instead of the resolved queue name, breaking message delivery when QueueOptions.Queue was overridden. Gateway: GetOriginalStream() retrieved the header as System.IO.Stream instead of StreamName, so it always returned null. Co-Authored-By: Claude Opus 4.6 --- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs | 4 ++-- .../Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs index 53c2aa177..972c48239 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs @@ -30,8 +30,8 @@ public static Metadata GetContextMeta(IMessageConsumeContext context) { [PublicAPI] public static class ProducedMessageExtensions { extension(ProducedMessage message) { - public Stream? GetOriginalStream() - => message.AdditionalHeaders?.Get(GatewayContextItems.OriginalStream); + public StreamName? GetOriginalStream() + => message.AdditionalHeaders?.Get(GatewayContextItems.OriginalStream); public object? GetOriginalMessage() => message.AdditionalHeaders?.Get(GatewayContextItems.OriginalMessage); diff --git a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs index df6d5eaf3..df39635c7 100644 --- a/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs +++ b/src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs @@ -123,10 +123,10 @@ protected override ValueTask Subscribe(CancellationToken cancellationToken) { Options.QueueOptions.Arguments ); - Log.InfoLog?.Log("Binding exchange {Exchange} to queue {Queue}", exchange, Options.SubscriptionId); + Log.InfoLog?.Log("Binding exchange {Exchange} to queue {Queue}", exchange, queue); _channel.QueueBind( - Options.SubscriptionId, + queue, exchange, Options.BindingOptions.RoutingKey, Options.BindingOptions.Arguments From ba7ba9c1e034777a17cb4f3058dd1b0e0eb148b3 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Fri, 27 Feb 2026 10:53:40 +0100 Subject: [PATCH 2/2] Add tests for RabbitMQ custom queue binding and Gateway GetOriginalStream RabbitMQ: integration test that uses a custom QueueOptions.Queue name different from SubscriptionId, verifying messages are still delivered. Gateway: unit tests for ProducedMessageExtensions verifying that GetOriginalStream returns StreamName (not System.IO.Stream), plus tests for other header accessors and null-header edge case. Co-Authored-By: Claude Opus 4.6 --- .../GatewayMetaTests.cs | 64 +++++++++++++++++ .../CustomQueueSubscriptionSpec.cs | 72 +++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 src/Gateway/test/Eventuous.Tests.Gateway/GatewayMetaTests.cs create mode 100644 src/RabbitMq/test/Eventuous.Tests.RabbitMq/CustomQueueSubscriptionSpec.cs diff --git a/src/Gateway/test/Eventuous.Tests.Gateway/GatewayMetaTests.cs b/src/Gateway/test/Eventuous.Tests.Gateway/GatewayMetaTests.cs new file mode 100644 index 000000000..01f226d27 --- /dev/null +++ b/src/Gateway/test/Eventuous.Tests.Gateway/GatewayMetaTests.cs @@ -0,0 +1,64 @@ +using Eventuous.Gateway; +using Eventuous.Producers; + +namespace Eventuous.Tests.Gateway; + +public class GatewayMetaTests { + [Test] + public async Task GetOriginalStream_ReturnsStreamName() { + var streamName = new StreamName("Test-123"); + + var headers = new Metadata(new Dictionary { + [GatewayContextItems.OriginalStream] = streamName + }); + + var message = new ProducedMessage("test", null, headers); + + var result = message.GetOriginalStream(); + + await Assert.That(result).IsNotNull(); + await Assert.That(result!.Value).IsEqualTo(streamName); + } + + [Test] + public async Task GetOriginalMessageId_ReturnsMessageId() { + var messageId = Guid.NewGuid().ToString(); + + var headers = new Metadata(new Dictionary { + [GatewayContextItems.OriginalMessageId] = messageId + }); + + var message = new ProducedMessage("test", null, headers); + + await Assert.That(message.GetOriginalMessageId()).IsEqualTo(messageId); + } + + [Test] + public async Task GetOriginalMessageType_ReturnsMessageType() { + var headers = new Metadata(new Dictionary { + [GatewayContextItems.OriginalMessageType] = "test-event" + }); + + var message = new ProducedMessage("test", null, headers); + + await Assert.That(message.GetOriginalMessageType()).IsEqualTo("test-event"); + } + + [Test] + public async Task GetOriginalStreamPosition_ReturnsPosition() { + var headers = new Metadata(new Dictionary { + [GatewayContextItems.OriginalStreamPosition] = 42UL + }); + + var message = new ProducedMessage("test", null, headers); + + await Assert.That(message.GetOriginalStreamPosition()).IsEqualTo(42UL); + } + + [Test] + public async Task GetOriginalStream_WithNullHeaders_ReturnsNull() { + var message = new ProducedMessage("test", null); + + await Assert.That(message.GetOriginalStream()).IsNull(); + } +} diff --git a/src/RabbitMq/test/Eventuous.Tests.RabbitMq/CustomQueueSubscriptionSpec.cs b/src/RabbitMq/test/Eventuous.Tests.RabbitMq/CustomQueueSubscriptionSpec.cs new file mode 100644 index 000000000..231b6cc61 --- /dev/null +++ b/src/RabbitMq/test/Eventuous.Tests.RabbitMq/CustomQueueSubscriptionSpec.cs @@ -0,0 +1,72 @@ +using Eventuous.Producers; +using Eventuous.RabbitMq.Producers; +using Eventuous.RabbitMq.Subscriptions; +using Eventuous.Subscriptions.Filters; +using Eventuous.TestHelpers.TUnit; +using Eventuous.TestHelpers.TUnit.Logging; +using Eventuous.Tests.Subscriptions.Base; + +namespace Eventuous.Tests.RabbitMq; + +[ClassDataSource] +public class CustomQueueSubscriptionSpec { + static CustomQueueSubscriptionSpec() => TypeMap.Instance.RegisterKnownEventTypes(typeof(TestEvent).Assembly); + + RabbitMqProducer _producer = null!; + TestEventHandler _handler = null!; +#pragma warning disable TUnit0023 + RabbitMqSubscription _subscription = null!; + TestEventListener _es = null!; +#pragma warning restore TUnit0023 + readonly StreamName _exchange; + readonly ILogger _log; + readonly ILoggerFactory _loggerFactory; + readonly RabbitMqFixture _fixture; + + public CustomQueueSubscriptionSpec(RabbitMqFixture fixture) { + _fixture = fixture; + _exchange = new(Guid.NewGuid().ToString()); + _loggerFactory = LoggingExtensions.GetLoggerFactory(); + _log = _loggerFactory.CreateLogger(); + } + + [Test] + public async Task SubscribeWithCustomQueueName(CancellationToken cancellationToken) { + var testEvent = TestEvent.Create(); + await _producer.Produce(_exchange, testEvent, new(), cancellationToken: cancellationToken); + await _handler.AssertThat().Timebox(10.Seconds()).Any().Match(x => x as TestEvent == testEvent).Validate(cancellationToken); + } + + [Before(Test)] + public async ValueTask InitializeAsync() { + _es = new(); + _handler = new(); + _producer = new(_fixture.ConnectionFactory); + + var subscriptionId = Guid.NewGuid().ToString(); + var customQueue = Guid.NewGuid().ToString(); + + _subscription = new( + _fixture.ConnectionFactory, + new RabbitMqSubscriptionOptions { + ConcurrencyLimit = 10, + SubscriptionId = subscriptionId, + Exchange = _exchange, + ThrowOnError = true, + QueueOptions = new RabbitMqSubscriptionOptions.RabbitMqQueueOptions { Queue = customQueue } + }, + new ConsumePipe().AddDefaultConsumer(_handler), + _loggerFactory + ); + await _subscription.SubscribeWithLog(_log); + await _producer.StartAsync(); + } + + [After(Test)] + public async ValueTask DisposeAsync() { + await _producer.StopAsync(); + await _subscription.UnsubscribeWithLog(_log); + _es.Dispose(); + await _subscription.DisposeAsync(); + } +}