-----Original Message----- From: Daiter, Sophie Sent: Wednesday, November 01, 2023 13:03 To: users-h...@activemq.apache.org Subject: RE: WELCOME to users@activemq.apache.org
Hi, I am using Apache.NMS.ActiveMQ.NetStd nuget version 1.8.0 We implemented producer and consumer. Everything is working fine but few times a day we encounter strange behavior where "producer.send(message)" finishes successfully but the consumer doesn’t get the message. Could you help me to figure out why this might happen? Can the actual queue may be loosing messages? Maybe producer.send() is swallowing exceptions it has and doesn’t succeed? Attached producer and consumer we implemented Please advice Sophie -----Original Message----- From: users-h...@activemq.apache.org <users-h...@activemq.apache.org> Sent: Wednesday, November 01, 2023 12:51 To: Daiter, Sophie <sophie.dai...@intel.com> Subject: WELCOME to users@activemq.apache.org Hi! This is the ezmlm program. I'm managing the users@activemq.apache.org mailing list. I'm working for my owner, who can be reached at users-ow...@activemq.apache.org. Acknowledgment: I have added the address sophie.dai...@intel.com to the users mailing list. Welcome to users@activemq.apache.org! Please save this message so that you know the address you are subscribed under, in case you later want to unsubscribe or change your subscription address. --- Administrative commands for the users list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: <users-subscr...@activemq.apache.org> To remove your address from the list, send a message to: <users-unsubscr...@activemq.apache.org> Send mail to the following for info and FAQ for this list: <users-i...@activemq.apache.org> <users-...@activemq.apache.org> Similar addresses exist for the digest list: <users-digest-subscr...@activemq.apache.org> <users-digest-unsubscr...@activemq.apache.org> To get messages 123 through 145 (a maximum of 100 per request), mail: <users-get.123_...@activemq.apache.org> To get an index with subject and author for messages 123-456 , mail: <users-index.123_...@activemq.apache.org> They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: <users-thread.12...@activemq.apache.org> The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example "john@host.domain", just add a hyphen and your address (with '=' instead of '@') after the command word: <users-subscribe-john=host.dom...@activemq.apache.org> To stop subscription for this address, mail: <users-unsubscribe-john=host.dom...@activemq.apache.org> In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at users-ow...@activemq.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: <sophie.dai...@intel.com> Received: (qmail 651951 invoked by uid 116); 1 Nov 2023 10:50:55 -0000 Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org) (95.217.134.168) by apache.org (qpsmtpd/0.94) with ESMTP; Wed, 01 Nov 2023 10:50:55 +0000 Authentication-Results: apache.org; auth=none Received: from localhost (localhost [127.0.0.1]) by spamproc1-he-fi.apache.org (ASF Mail Server at spamproc1-he-fi.apache.org) with ESMTP id 8329AC0CAE for <users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>; Wed, 1 Nov 2023 10:50:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org X-Spam-Flag: NO X-Spam-Score: -0.201 X-Spam-Level: X-Spam-Status: No, score=-0.201 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_HIGH=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=intel.com Received: from mx1-ec2-va.apache.org ([116.203.227.195]) by localhost (spamproc1-he-fi.apache.org [95.217.134.168]) (amavisd-new, port 10024) with ESMTP id 9_ygBo4SCq-W for <users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>; Wed, 1 Nov 2023 10:50:53 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=192.55.52.93; helo=mgamail.intel.com; envelope-from=sophie.dai...@intel.com; receiver=<UNKNOWN> Received: from mgamail.intel.com (mgamail.intel.com [192.55.52.93]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 4DCD7BEE5D for <users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org>; Wed, 1 Nov 2023 10:50:51 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1698835852; x=1730371852; h=from:to:subject:date:message-id:references:in-reply-to: mime-version:content-transfer-encoding; bh=vo8m+2O2mKgicetWUN0HkoYjmQHSZmS+02zQy6SD4PI=; b=CIzGG0U6+4I9GFmjxFKoclr5G5maOZf7cgMIHnxb3xa4huNuntDSZ1co uHKK9Z9v25WuWHMkR4Gghmtu2A+1LDR3pkA4+WUwW5RGfzoZjJcx17H93 inGimBa0tY7qeFLaz5mbKIcz98bWbT4oi6sQbs229V9LjOJc6ftWJ4xhF YwEqL3NEyYjNeBZkcuPVSB6TMMWuyCH06euWmuqFWxtA2Un9xRHt1WuGu 570eZzoigq6iM/qfmP8cwh6SVULSOKJHIa764ntp72LAxHuhIYoVEu3l7 e4IPCjyWiF8IwTe7TU+Tgo+HN12Cg+cGi3gyvXfTSck3SwPw0QILpB5vY Q==; X-IronPort-AV: E=McAfee;i="6600,9927,10880"; a="385650084" X-IronPort-AV: E=Sophos;i="6.03,268,1694761200"; d="scan'208";a="385650084" Received: from orsmga004.jf.intel.com ([10.7.209.38]) by fmsmga102.fm.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 01 Nov 2023 03:50:44 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=McAfee;i="6600,9927,10880"; a="884528894" X-IronPort-AV: E=Sophos;i="6.03,268,1694761200"; d="scan'208";a="884528894" Received: from orsmsx603.amr.corp.intel.com ([10.22.229.16]) by orsmga004.jf.intel.com with ESMTP/TLS/AES256-GCM-SHA384; 01 Nov 2023 03:50:44 -0700 Received: from orsmsx603.amr.corp.intel.com (10.22.229.16) by ORSMSX603.amr.corp.intel.com (10.22.229.16) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.2507.34; Wed, 1 Nov 2023 03:50:43 -0700 Received: from ORSEDG601.ED.cps.intel.com (10.7.248.6) by orsmsx603.amr.corp.intel.com (10.22.229.16) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256) id 15.1.2507.34 via Frontend Transport; Wed, 1 Nov 2023 03:50:43 -0700 Received: from NAM10-BN7-obe.outbound.protection.outlook.com (104.47.70.101) by edgegateway.intel.com (134.134.137.102) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.1.2507.34; Wed, 1 Nov 2023 03:50:43 -0700 ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=nzPk3QVZEuTKyrA2kaYKdJVaxEUr8t6y+MIvPjp8flvzhWPHYWeItdIEkB+DXY56Kzlk7ygTApalIfdG8XbAF3P/Cb5J6pMpu1WyWF6Ga9bjbjufAW5AAXaw2vkcp5/I4lWXaqKuOAoIUQkldeCbLpKrLhP7DbNzq3Zso5g1fWRaxFx39WK0FAxfjr67vKIKADEv0rn1/SAPgFI1JGgZ0xe6UEyTAC7iPUN564OQPlK3nedRZuep/KPxuMgF6qu1wQ9bdL+Q1GiIsm9g2m8YIetLWA7cn7jR3/G9FhevY+ilCZYY9b5OKvRhHQ/WtdpEJtpXBX737oRgowal391g0A== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-AntiSpam-MessageData-ChunkCount:X-MS-Exchange-AntiSpam-MessageData-0:X-MS-Exchange-AntiSpam-MessageData-1; bh=qq+nCUaXnzoSSZJui/l9uazCgZswOKZgWvZd+bCcneU=; b=lW9j3kjObHpAY0nM/R4qiuUbjxZrJWOdfXOG6lKSBPT0pEPLNXsd1iwEf5XB/mgWDaK/ZAGIKpzQXVNdXjfVB60jqi4G3ZvsTnpDlDJfMdi5fOjkmwPMHqg9VuO+veonKA/bEyfceoI9uo7FgDHPthD4Adekqu41EjEI6eRcaSOUvYpBBMMcmpxJrWcvLiW209SDUMiHOu6etvyJzarzhyKhURjjvOtupA2JtOE+S8kuTd/mO5ju7W/DSywvm4HUB3hecagoCxQNq6/LrOpZE8h0NrjHSMQM3gBo9eJZT0457zKrsbiRxE0p2cPYZ+BSyErcviUxnarf/jbTUFGwYA== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=intel.com; dmarc=pass action=none header.from=intel.com; dkim=pass header.d=intel.com; arc=none Received: from DM4PR11MB8180.namprd11.prod.outlook.com (2603:10b6:8:18d::17) by CY5PR11MB6511.namprd11.prod.outlook.com (2603:10b6:930:41::7) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.6933.29; Wed, 1 Nov 2023 10:50:41 +0000 Received: from DM4PR11MB8180.namprd11.prod.outlook.com ([fe80::600:1cfe:696f:5d14]) by DM4PR11MB8180.namprd11.prod.outlook.com ([fe80::600:1cfe:696f:5d14%5]) with mapi id 15.20.6933.029; Wed, 1 Nov 2023 10:50:41 +0000 From: "Daiter, Sophie" <sophie.dai...@intel.com> To: "users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org" <users-sc.1698835750.jcinbcigacfbiajlibhg-sophie.daiter=intel....@activemq.apache.org> Subject: RE: confirm subscribe to users@activemq.apache.org Thread-Topic: confirm subscribe to users@activemq.apache.org Thread-Index: AQHaDLER9RKvdXCE+U+WU2P8G3ZkvrBlSPeA Date: Wed, 1 Nov 2023 10:50:41 +0000 Message-ID: <dm4pr11mb8180ca54142710aefeed6d8ee6...@dm4pr11mb8180.namprd11.prod.outlook.com> References: <1698835750.647727.ez...@activemq.apache.org> In-Reply-To: <1698835750.647727.ez...@activemq.apache.org> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: dkim=none (message not signed) header.d=none;dmarc=none action=none header.from=intel.com; x-ms-publictraffictype: Email x-ms-traffictypediagnostic: DM4PR11MB8180:EE_|CY5PR11MB6511:EE_ x-ms-office365-filtering-correlation-id: 22dca17c-3d33-422c-b6fd-08dbdac8641c x-ms-exchange-senderadcheck: 1 x-ms-exchange-antispam-relay: 0 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: bkZw5Y0YU3LD0+py4tz+Bq00QLHJJPVVPLhXPZVTQZUpGvM/Kciy7RsaURAAAmm/GkC+jwbVvJGMkFdgE950uUpcSQ0yS2UAtAlyPmvd4ntL4SD3a+VE0f75d2aogxNIaDv8bExKV2hmmwW6Ot5IncgjOTBQyd7M7S8C36dzYVJJqS9rbnRkonZjCcV6blZtVl6A4Funz7GB5kPp3iEgV7xt+b/yhAQrPymfxcCCc/CZzazp8qBT3cdZOg5r1JgsEf/+KS4YRPN+KjcAukOVLpeVOKRbGyg9HPXocnFnGs3l2CkEkUnNPo9iuPdH/qRppEAxm3E2lD9QAEDPRgiBlkm8CmZuLZ/LLATa24cYyI2g0S+yVnZwtp1WfP3Pe2dma4hl2YEsmp1bG3kJPm9uoy7GWJpyIudLphDio3EJJiF+0zKNb83ilWxGtiVWC+LuerrkW9/X6Xgk9gP9fVuMpsd1pUYxMdxs2TL228zV9BH0fnLAFbkp2TW5gLvabz2hhV8WMOMp+BoGxMaxidCV2VSfA/ZLp6Lrts4/32V3mUKsRta7La6rk/HQ5BtLAmQYZju16GmdiPumNgz5Jmm6nccRnG13a4nRG6yKNQqzqSM= x-forefront-antispam-report: CIP:255.255.255.255;CTRY:;LANG:en;SCL:1;SRV:;IPV:NLI;SFV:NSPM;H:DM4PR11MB8180.namprd11.prod.outlook.com;PTR:;CAT:NONE;SFS:(13230031)(396003)(366004)(136003)(39860400002)(346002)(376002)(230922051799003)(186009)(451199024)(1800799009)(64100799003)(55016003)(26005)(9686003)(45080400002)(53546011)(52536014)(6506007)(478600001)(71200400001)(2906002)(66574015)(83380400001)(7696005)(30864003)(316002)(5660300002)(66556008)(41300700001)(76116006)(66946007)(19627235002)(66476007)(8676002)(66446008)(966005)(64756008)(8936002)(33656002)(38100700002)(122000001)(82960400001)(86362001)(38070700009)(66899024);DIR:OUT;SFP:1102; x-ms-exchange-antispam-messagedata-chunkcount: 1 x-ms-exchange-antispam-messagedata-0: =?utf-8?B?Rmp1dU1YalYyZ3UxRFF1b1NjYVVkSjFHRDdvYUhENUxsSTJ5RDRCSHBDR3Jp?= =?utf-8?B?MUxBbVJpQVg0Z2R5SGtVMWNHTGlRWjFWSDI3QjVKOFhFNmZFWHdGbm5yenVH?= =?utf-8?B?eDI4Ylo4RUhJcHQxWStUdSs4cHc5YzM5WGpoeFFLd2djRWhrSWZTVGMyU1pu?= =?utf-8?B?L1hPMDVFdDE4MkZ4Sno3akNQWTJRTjJKeC9OUEdKRkxzWWp3ZlJuNTE2TXlw?= =?utf-8?B?Y252bFpLbTVLd2ZYZDZFMGYzTFRicmJXa050TS95ZDUxdnFIckdxald2VUIz?= =?utf-8?B?M2NUZHA1RzMzNlFBN3BKMCs5ZExWQXhDcWREYU9JOEV2eUJKcGZHVHRZYm53?= =?utf-8?B?OFdmYU1lTm1DRW8rMllaRUk3eDdlWjQrWUNnbjNSN09sOEQvbE9hYklMVG1i?= =?utf-8?B?NCtGWFBSaFQyZjQ5czRhbkFkajEwS3ZGcUNBbElSd0tMbzMyb1pIZWdER0Jr?= =?utf-8?B?b3VTUi9oVGEreGRtWFYxUmJlNU5weHR1NVhnZjFwSlh6MzRhZlYyMUl2R3NC?= =?utf-8?B?L3dMSUZEWlE5U1Z4TDhtRk1RQTVzWEpVLytidHE0OC9kbWY2TTVZeDFJbkRv?= =?utf-8?B?aTNpRmV1ckllN002NlZGeVNEUVMvTDJJTndzVzNybVlkMEpEdXRUTkxUcitV?= =?utf-8?B?enAyMktyNTNaOXJYNW1nUEZWdU96bVErM0lIUnQra3lTamdyVml0LzJTWjRL?= =?utf-8?B?bUNVQXkzNXl2UmhBc0lyNzdESTRlYU1KV2lMbDBXNlIza2Y2enNXcEl1ZUhZ?= =?utf-8?B?bEZ1RlZ6bmZBa25JbHcvZ3ZYWGxuR3pRLzd5c3FScmtHS2luTldLeEg2Yy9a?= =?utf-8?B?NFBEcHFCb2FmNnNVRDVxMko1d0h6THpMZml1UzZlUDM4WWxxS3gvYVFzajhH?= =?utf-8?B?eDBjUW1XMTdwN1N4VmpyQ2Z6T3lUY2lRS3pxcEpxOTBWRnY3cHBGb0Q3NnNY?= =?utf-8?B?UkJUTEk3eS9vT21YSG9GdVowTFN2VThUN3BoSjJGSXNpZSs0TTdwUzg0MUpt?= =?utf-8?B?WlNlcEZta0tqVTRIczVKQ1FtN2hDSHBtbkZCSFYrR2paYnF1NWh4akdTdU5r?= =?utf-8?B?emNpd2pwakxoY3BJTUtIaVk0YkxEYkV5YjQzYmJlL1VTVi8zVjdkR1ZLK010?= =?utf-8?B?V3pKQzM2aEswTUtDU0Q1L203TVpPeXdZUTNpYlV3ZnVHdVM2Q0FWb0h1UDRh?= =?utf-8?B?djVEQzBoNy9NanJVYXBibWlWdnprZHNBRWh4L2IwOU5KdmZ4UGNSZThWdVN2?= =?utf-8?B?cERpUDZvQlRwSGVEaTluMERlT2NRR0VvQW1pTnJmU1MrakFCekc3NktXQXBa?= =?utf-8?B?UXNZOUgzeWhJUjJjSFljcGorSjVhMy8yUnYxemRZaWdrblVCekpaZEFSMEQr?= =?utf-8?B?d21nT295ejJSQWNQWW9aS3pXOG54dCtET0NEYVdtUEdUK1l1WTZrZzBhblZk?= =?utf-8?B?TzNrMEpjakhyd1JCNEQ2eUtudDFTZTJINi9obU5sWDBSbDh4VURBRDR1VXN2?= =?utf-8?B?Q2pHelZyenV5djlaWnk0Zmd0UEduV0x4VU4zK0UzazErUGFyTlhmNmtjSndN?= =?utf-8?B?MHNqbXZjY3RsTTgvL1dzVElDT3BJTVlwZnhpdDJQTzRBWklFcXFlazRaeWk3?= =?utf-8?B?Vmh0Yk5sdk1YaWtHcW4yTUtEK1ZmZWxCSHVXWUF1VDA3cjl3SS9wUVNrNk9B?= =?utf-8?B?anRZNXdKS0RGUHFUcldLUURIYXN6bDF2elRTUGdoUm80Rm43bHc0aUlFWkhH?= =?utf-8?B?SmJ5RXNKUEtlREtTeXJUbzg3K0lJTjlqMTI4ZkYrRXNQdCtMVVlMODVlRHVj?= =?utf-8?B?SkowU2V5a2Y3WlZEakFFU0ZtYUFJbU0wRk0xbGRlQXVsYUdtd0g1RjBHMDR6?= =?utf-8?B?TWJvNXhSSXlQblM3V2FGTUtEZmREODd2S3ZPNHFPS09JSi8wTG9nOTA5TnRk?= =?utf-8?B?TitkWjhzSHlBTll3dDdyVU5hVzNwWnNyb3VBUW1xM21odzFsbXBzU2FKOW1T?= =?utf-8?B?dkx4NGhPZENjWTlYdlF4aGFGdHQ0MW1SL296WDhKWHVxZ2lwT2FzeTNndVk1?= =?utf-8?B?SWh4MGpLa1J0VlhmTmpYdUY5Q0dwUmpEekpyb1AyRG1sQjBLdGo4Y0RpaVpz?= =?utf-8?Q?5EjcC6LijXri3hdgtRTQF5jbG?= Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 X-MS-Exchange-CrossTenant-AuthAs: Internal X-MS-Exchange-CrossTenant-AuthSource: DM4PR11MB8180.namprd11.prod.outlook.com X-MS-Exchange-CrossTenant-Network-Message-Id: 22dca17c-3d33-422c-b6fd-08dbdac8641c X-MS-Exchange-CrossTenant-originalarrivaltime: 01 Nov 2023 10:50:41.0850 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: 46c98d88-e344-4ed4-8496-4ed7712e255d X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: pp3lhzkNI27jWif2D3GmGIRahJzwyh9L6vTJnRjAfn12lxoinuSxNZ0pBlA+V4N368uC+jIjyVtJoH0AG/59UQ== X-MS-Exchange-Transport-CrossTenantHeadersStamped: CY5PR11MB6511 X-OriginatorOrg: intel.com Content-Transfer-Encoding: base64 --------------------------------------------------------------------- Intel Israel (74) Limited This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
using System; using System.Threading.Tasks; using Apache.NMS; using Apache.NMS.ActiveMQ; using CSharpFunctionalExtensions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using OrchestratorService.Configuration; using OrchestratorService.Logging; using IAmqMessage = Apache.NMS.IMessage; namespace OrchestratorService.Queue.ActiveMq { public class ActiveMqConsumer<T> : IQueueConsumer where T : IMessage { private readonly ISparkLogger<ActiveMqConsumer<T>> _logger; private readonly IOptions<QueueOptions> _queueOptions; private Apache.NMS.IMessageConsumer _consumer; private Func<string, Task> _onMessage; private IConnection _connection; private ISession _session; public ActiveMqConsumer(ISparkLogger<ActiveMqConsumer<T>> logger, IOptions<QueueOptions> queueOptions) { _logger = logger; _queueOptions = queueOptions; } public Result StartConsuming(Func<string, Task> onMessage) { var config = _queueOptions.Value; _onMessage = onMessage; try { var factory = new ConnectionFactory(config.BrokerUri); _connection = factory.CreateConnection(config.Username, config.Password); _connection.Start(); // todo: use transactional ack to handle failures in execution // _session = _connection.CreateSession(AcknowledgementMode.Transactional); // with client ack for a specific message: https://activemq.apache.org/components/nms/msdoc/1.6.0/vs2005/Output/html/T_Apache_NMS_AcknowledgementMode.htm _session = _connection.CreateSession(); var destination = _session.GetQueue(config.QueueName); var messageType = typeof(T).Name; var selector = $"messageType = '{messageType}'"; _consumer = _session.CreateConsumer(destination, selector); _logger.LogInformation("Start consuming messages...."); _consumer.Listener += OnTextMessage; _connection.ExceptionListener += ConnectionOnExceptionListener; _connection.ConnectionInterruptedListener += ConnectionOnConnectionInterruptedListener; _connection.ConnectionResumedListener += ConnectionOnConnectionResumedListener; } catch (Exception e) { return Result.Fail($"Cannot start handling messages, error: {e}"); } return Result.Ok(); } public Result StopConsuming() { try { _consumer.Listener -= OnTextMessage; _onMessage = null; _consumer.Dispose(); _session.Dispose(); _connection.ExceptionListener -= ConnectionOnExceptionListener; _connection.ConnectionInterruptedListener -= ConnectionOnConnectionInterruptedListener; _connection.ConnectionResumedListener -= ConnectionOnConnectionResumedListener; _connection.Stop(); _connection.Dispose(); } catch (Exception e) { return Result.Fail($"Cannot stop handling messages, error: {e}"); } _logger.LogInformation("Stopped consuming messages...."); return Result.Ok(); } // see: https://docs.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming // for explanation on async void here private async void OnTextMessage(IAmqMessage message) { if (!(message is ITextMessage textMessage)) { _logger.LogCritical($"received message {message.NMSMessageId} is not a text message"); return; } _logger.LogInformation($"Received message: {textMessage.Text}"); if (_onMessage == null) { _logger.LogCritical($"cannot handle message {message.NMSMessageId}, onMessage is null"); } else { await _onMessage(textMessage.Text); } } // Active MQ has automatic failover when the connection is interrupted // the connection is recreated & resumed automatically private void ConnectionOnConnectionResumedListener() { _logger.LogInformation("ActiveMQ consumer connection resumed"); } private void ConnectionOnConnectionInterruptedListener() { _logger.LogInformation("ActiveMQ consumer connection interrupted"); } private void ConnectionOnExceptionListener(Exception exception) { _logger.LogError($"ActiveMQ consumer connection caught exception: ${exception}"); } } }
using System; using Apache.NMS; using Apache.NMS.ActiveMQ; using CSharpFunctionalExtensions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using OrchestratorService.Configuration; using OrchestratorService.Logging; namespace OrchestratorService.Queue.ActiveMq { public class ActiveMqProducer : IQueueProducer, IDisposable { private readonly ISparkLogger<ActiveMqProducer> _logger; private readonly IConnection _connection; private readonly ISession _session; private readonly Apache.NMS.IMessageProducer _producer; public ActiveMqProducer(IOptions<QueueOptions> queueOptions, ISparkLogger<ActiveMqProducer> logger) { _logger = logger; var config = queueOptions.Value; var factory = new ConnectionFactory(config.BrokerUri); _connection = factory.CreateConnection(config.Username, config.Password); _connection.Start(); _session = _connection.CreateSession(); var destination = _session.GetQueue(config.QueueName); _producer = _session.CreateProducer(destination); _connection.ExceptionListener += ConnectionOnExceptionListener; _connection.ConnectionInterruptedListener += ConnectionOnConnectionInterruptedListener; _connection.ConnectionResumedListener += ConnectionOnConnectionResumedListener; } public void Dispose() { try { _producer.Dispose(); _session.Dispose(); _connection.ExceptionListener -= ConnectionOnExceptionListener; _connection.ConnectionInterruptedListener -= ConnectionOnConnectionInterruptedListener; _connection.ConnectionResumedListener -= ConnectionOnConnectionResumedListener; _connection.Stop(); _connection.Dispose(); } catch (Exception e) { _logger.LogCritical($"failed to dispose producer, error: {e}"); } _logger.LogInformation("producer disposed successfully"); } public Result QueueMessage<T>(string message) { var messageToSend = _session.CreateTextMessage(message); // create a property with the message type that can be used as a selector by the consumer. // this can be used later on for messages that are of other types than WorkItemMessage var messageType = typeof(T).Name; messageToSend.Properties.SetString("messageType", messageType); _logger.LogInformation($"Sending message to queue: {message}..."); try { _producer.Send(messageToSend); } catch (Exception e) { var error = $"failed to add message to queue, \nerror: ${e} \nmessage: {message}"; return Result.Fail(error); } _logger.LogInformation($"message was added successfully to queue, message: {message}"); return Result.Ok(); } // Active MQ has automatic failover when the connection is interrupted // the connection is recreated & resumed automatically private void ConnectionOnConnectionResumedListener() { _logger.LogInformation("ActiveMQ producer connection resumed"); } private void ConnectionOnConnectionInterruptedListener() { _logger.LogInformation("ActiveMQ producer connection interrupted"); } private void ConnectionOnExceptionListener(Exception exception) { _logger.LogError($"ActiveMQ producer connection caught exception: ${exception}"); } } }