- preamble
-
News Release
- Carrying message headers
- Setting the Message Prefix
- Native support for delayed messages
- Parallel release of messages
-
transaction message
- Transaction Messaging
- Transaction Message Consumption
- Reimbursement of services
-
message processing
- serialize
- (machine) filter
- Message Retry
- multi-threaded processing
- Automatic recovery/reconnection
- Distributed Storage Lock
- Message Version Isolation
- Optimized Snowflake Algorithm
- Message auto-cleaning
-
Consumer Characteristics
- Attribute Subscription
- Multi-Attribute Subscription
- wildcard subscription
- asynchronous subscription
- Multi-Installation Subscription
- prefix subscription
- consumer group
- Fanout consumption
- implicit type consumption
- No type of consumption
- Interfacing Type Consumption
- Serial consumption
- parallel consumption
- backpressure mechanism
- Independent consumption
-
Control and Integration Features
- Start/Stop API
- Heterogeneous Systems Integration
- Multiple transmission support
- Multiple storage support
-
Monitoring, Diagnostics, Observability
- polyglot
- Real-time monitoring and visualization
- Manual View and Retry
- Tight integration with Core licenses
- Consul Node Discovery
- K8s Node Discovery
- Diagnostics API
- OpenTelemetry Support
- summarize
preamble
This is a list of all the features of CAP, including some features not mentioned in the official documentation, to see if there are features you do not know, but also for new users in the selection of a good reference.
CAP is now more than just a distributed transaction solution. It offers a number of features in message processing that allow it to navigate complex business scenarios while remaining lightweight, high-performance, and easy to use.
In terms of data compatibility, we have made a lot of efforts, the 3.0 version released 5 years ago can be helicoptered to the latest version without any changes. At the same time, the past 7 years of continuous improvement and refinement, in terms of stability has also won a lot of praise from users.
Project Profile:
- CAP is an open source solution for dealing with distributed transaction problems, the project was born in 2016, and currently has more than 6500+ Star and 110+ contributors in Github, as well as more than 8 million downloads in NuGet.
News Release
Carrying message headers
Message headers can be used to pass metadata information related to the message. CAP supports carrying customized headers in messages that can be read and processed on the consumer side. Header information is added when the message is published:
var headers = new Dictionary<string, string>
{
{ "", "first" },
{ "", "second" }
};
await ("", , headers);
Setting the Message Prefix
The use of prefixes allows for effective management and organization of messages, allowing for a certain hierarchical structure of Topics of different types or applications, and CAP allows for specific prefixes to be set for messages for easy classification and identification. It can be set in the configuration:
(x =>
{
= "prefix";
});
Native support for delayed messages
Delayed messages can be used to implement timed task scheduling execution, and can also be used to implement mechanisms such as retrying by sending delayed messages, as well as having multiple applications in scenarios such as cache invalidation, order timeout processing, traffic clipping, asynchronous workflows, and so on.CAP natively supports delayed messages without resorting to the capabilities of a message queue, e.g. delaying the release of a message by 100 seconds:
await ((100), "", );
Parallel release of messages
If you're looking for high performance when sending messages and don't particularly care about message order, CAP supports turning on the Parallel Publishing Messages configuration, where internally CAP processes the data in batches, which can dramatically increase the speed of publishing.
(x =>
{
= true;
});
transaction message
Transaction messages are the first major feature of CAP, in the process of building SOA or microservice systems, usually need to use events to integrate various services, in the process of simple use of message queues can not guarantee the ultimate consistency of the data, CAP uses and the current database integration of the local message table solution to solve the distributed system call each other in each section of the possible exceptions, it can guarantee that no event messages will be lost in any case. It guarantees that event messages will not be lost under any circumstances.
Background Knowledge:/savorboard/p/
Transaction Messaging
Sending transaction messages requires ensuring that both business messages and event messages are contained within a database transaction context to ensure data consistency. The following is an example of integration with local transactions:
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = (_capBus, autoCommit: true))
{
("insert into test(name) values('test’)",
transaction: (IDbTransaction));
await ("", );
}
}
If you are using a third party ORM, refer to the link below.
FreeSQL integration example:/dotnetcore/FreeSql/discussions/1202
SqlSugar integration example:/DotNetNext/SqlSugar/issues/1207
Chloe integration example:/shuxinqin/Chloe/issues/328
Transaction Message Consumption
Consumer-side transactions aren't really necessary since messages aren't lost anymore, but you can turn them on if you want. Consumer-side automated transactions are primarily turned on using the filters provided by CAP.
1、Create filters
public class MyCapFilter : SubscribeFilter {
private readonly AppDbContext _dbContext;
private IDbContextTransaction _transaction;
public MyCapFilter(AppDbContext dbContext){
_dbContext = dbContext;
}
public override void OnSubscribeExecuting(ExecutingContext context){
_transaction = _dbContext.();
}
public override void OnSubscribeExecuted(ExecutedContext context){
_transaction.Commit();
}
public override void OnSubscribeException( context){
_transaction.Rollback();
}
}
2、Configure the filter
(opt =>
{
// ***
}.AddSubscribeFilter<MyCapFilter>();
Reimbursement of services
When performing multi-service business transaction operations, downstream execution failures may be encountered. CAP supports the provision of a compensation mechanism for operation failures, which can be realized through a callback mechanism. Example:
await ("", ...,"");
[CapSubscribe("")]
public object DeductProductQty(JsonElement param)
{
var orderId = ("OrderId").GetInt32();
// business logic
return new { OrderId = orderId, IsSuccess = true };
}
message processing
serialize
By default CAP uses Json to serialize messages, Json is now basically the standard format for transmission and this works well in most cases. If you have other concerns about performance, compatibility, security, etc., you can use theISerializer
Interfaces customize implementations to support formats such as XML, GRPC, MessagePack, and so on.
Serialize Json using .NET by default Realization, if you want to change to
ferret outhere are。
<ISerializer, YourSerializer>();
(machine) filter
CAP filters are similar to filters in MVC in that they can be intercepted and handled before, after, and on execution exceptions.
public class MyCapFilter: SubscribeFilter {
public override Task OnSubscribeExecutingAsync(ExecutingContext context){
// Before the subscription method executes
}
public override Task OnSubscribeExecutedAsync(ExecutedContext context){
// After the subscription method executes
}
public override Task OnSubscribeExceptionAsync(ExceptionContext context){
// Subscription method execution exception
}
}
(opt =>
{
// ***
}.AddSubscribeFilter<MyCapFilter>();
Message Retry
CAP supports a retry mechanism when message sending or execution fails. The retry interval and the number of retries can be configured:
(x =>
{
= 5;
= 60;
});
retry and rewind
The retryer periodically scans for failed messages and retries them. For failed messages, the default query for the current time is set back 4 minutes, this is to ensure that new messages are not picked up by the retryer.
You can do this byFallbackWindowLookbackSeconds
The configuration item custom configures this value.
multi-threaded processing
Multi-consumer thread support
Multiple threads of execution support
Automatic recovery/reconnection
Auto-reconnect for message queues plays an important role in distributed systems and message-driven architectures, especially in improving system availability, reliability, and fault tolerance.CAP implements health checking and auto-reconnect for connections natively rather than client-side implementations, thus maximizing connection stability, improving system fault tolerance, and enabling consumers to CAP is able to automatically recover from problems without the need to reboot the system or application.
This is a built-in feature and does not require any configuration.
Distributed Storage Lock
The retry thread defaults to read the stored messages every 1 minute for retrying messages that failed to be sent or consumed. There is no problem with a single instance, but there is a certain chance of concurrent reads in scenarios where multiple instances are enabled. CAP supports the configuration of database-based distributed locks so that multiple instances of concurrent reads can be avoided and the handling of exception scenarios is also taken into account.
(x =>
{
= true;
});
Message Version Isolation
Version isolation is the use of versioning features to separate message objects by version, so as to isolate different versions of the business or instances.CAP supports version isolation features, which can be used in several business scenarios, such as rapid iteration of the business, the need for forward compatibility, multiple versions of the server-side interfaces, the use of the same table for different instances, and the differentiation of the messages of different developers during the development phase. This can be achieved by configuring the version number:
(x =>
{
= "v1";
});
Optimized Snowflake Algorithm
The snowflake algorithm is used to generate a unique identifier for a message. The snowflake algorithm used by CAP is optimized to solve the clock drift problem as well as the default use of the Mac's lower 10 digits as the WorkerId to ensure that it will not be duplicated. CAP also supports custom snowflake algorithms as follows:
<ISnowflakeId, SnowflakeId>();
Message auto-cleaning
CAP has an automatic cleanup mechanism to ensure that database tables do not grow indefinitely. By default, messages that are successfully processed expire after 1 day and are then cleaned up, and messages that fail to be processed expire after 15 days and are then cleaned up. You can use theSucceedMessageExpiredAfter
up toFailedMessageExpiredAfter
Self-define their expiration time.
The cleaner is scanned for outdated messages once every 5 minutes, and you can access them via theCollectorCleaningInterval
configuration item to customize the interval.
Consumer Characteristics
Attribute Subscription
Attribute-based subscription can reduce repetitive code , automatically complete the message subscription and processing of the binding , this approach allows developers to focus on the business logic , the details of the message passing , to improve code readability and maintainability .
Message subscription and consumption using Attribute tagging:
[CapSubscribe("")]
public void Show(DateTime datetime)
{
(datetime);
}
Multi-Attribute Subscription
CAP allows the use of multiple Attributes on the same method, which allows methods to subscribe to several different message topics at the same time, thus increasing the flexibility and extensibility of the system and code reuse.
[CapSubscribe("")]
[CapSubscribe("")]
public void Show(DateTime datetime)
{
(datetime);
}
wildcard subscription
Support for message subscription via wildcards greatly simplifies subscription management, especially when dealing with a large number of topics or complex message routing requirements.
You can use the*
cap (a poem)#
wildcard for bulk subscription. For example, subscribe to all messages that start with "test.":
- can replace one or more words, # can replace zero or more words
[CapSubscribe("test.*")]
public void CheckReceivedMessage(DateTime datetime)
{
(datetime);
}
asynchronous subscription
Support for asynchronous methods to subscribe to the message , you can pass CancellationToken support for cancellation , which can be used with the start-stop Api to achieve flexible concurrent processing and full utilization of resources .
[CapSubscribe("test.*")]
public async Task CheckReceivedMessage(DateTime datetime, CancellationToken cancellationToken)
{
(datetime);
}
Multi-Installation Subscription
When modularizing the design of a single system, which generally follows separation of concerns, subscribers may be distributed across different assemblies, CAP supports message subscription across multiple assemblies by simply defining the subscribe method on a class in a different assembly and then using theAddSubscriberAssembly
Just configure it, and then CAP will automatically look for discovery registrations.
(x =>
{
}).AddSubscriberAssembly(new Assembly[]{});
prefix subscription
Subscription prefix is used to set a prefix on the parent class when subscribing, then the methods in the class will automatically inherit this prefix, generally used for categorizing consumers.
[CapSubscribe("customers")]
public class CustomersCapController : ICapSubscibe
{
[CapSubscribe(".create", isPartial: true)]
public Task HandleCreationAsync(Customer customer) {
}
[CapSubscribe(".update", isPartial: true)]
public Task HandleUpdateAsync(Customer customer) {
}
}
consumer group
Consumer groups are a very important concept introduced by Kafka for efficient message consumption management and load balancing.CAP also supports the concept of consumer groups and adapts them for different message queues to give users a consistent experience.
[CapSubscribe("", Group = "group1")]
public void CheckReceivedMessage(DateTime datetime)
{
(datetime);
}
Fanout consumption
With the help of the abstracted notion of consumer groups, CAP supports fan-out consumption messages, i.e., multiple receipts in one send.
If multiple consumers of the same consumer group consume the same Topic message, only one consumer will be executed. Conversely, if consumers are all located in different consumer groups, all consumers are executed.
In the following example, theHello1
cap (a poem)Hello2
If you are in a different Group, then you will receive the message, so you can send and receive multiple messages.
[CapSubscribe("hello", Group = "bar")]
public void Hello1(){
("hello 1");
}
[CapSubscribe("hello", Group = "foo")]
public void Hello2(){
("hello 2");
}
implicit type consumption
Consumer parameter reception supports implicit message type conversion, e.g. if you send a string type date string, you can also use DateTime on the consumer side to receive it.
await <string>("hello", "2020/10/10 11:11");
[CapSubscribe("hello")]
public void Hello(DateTime time){
($"hello {time}");
}
No type of consumption
If you don't know the type of the published message or don't want to bother defining the type, CAP supports the use of the Receive all types.
await _capBus.PublishAsync("hello", );
await _capBus.PublishAsync("hello", 1345);
await _capBus.PublishAsync("hello", true);
await _capBus.PublishAsync("hello", "hello");
await _capBus.PublishAsync("hello", new Person{Name = "Jack", Age = 22});
[CapSubscribe("hello")]
public void Hello(JsonElement obj){
(());
}
Interfacing Type Consumption
CAP supports interface-based typed message consumption via extensions, see the example in the
Serial consumption
Most scenarios expect messages to be sent and processed in the same order, and in CAP messages are processed serially in order by default.
parallel consumption
In some business scenarios where the order of messages may not be very important, but you need to process the messages quickly, turn on parallel processing with the following configuration item.
Support for enabling parallel execution of multiple subscriptions is provided through theEnableSubscriberParallelExecute
to open.
When turned on, support is available through theSubscriberParallelExecuteThreadCount
The configuration item sets the number of threads.
Internally the CAP implementation works by first placing the message received by the consuming thread into a buffer and then deciding whether to execute it serially or in parallel, so another configuration parameter is also providedSubscriberParallelExecuteBufferFactor
to set the buffer size, which is a multiplicative factor with a buffer size value of (SubscriberParallelExecuteThreadCount
* SubscriberParallelExecuteBufferFactor
)。
backpressure mechanism
The throughput of message sending or consuming is not only related to the user's code, but also related to the current hardware load and processing capability. The design of buffer can maximize the throughput and resource utilization of message processing to a certain extent, but it is not a panacea. When the buffer is full, CAP will automatically enable the backpressure mechanism to reduce the responsiveness, which ensures that the memory is always safe and does not cause OOM.
As a simple example, there is a pool that is filled with water and released from the pool at the same time, and when the rate of injection is greater than the rate of release, then the pool will slowly overflow when it is full. The back pressure mechanism is equivalent to adding a control valve to the water injection port, when the pool is full will control the valve flow to maintain the balance of water injection and release.
Independent consumption
There are cases where certain consumers take a very long time to execute, which can jam other consumers with shorter execution times, resulting in subsequent messages not being consumed in a timely manner.
Therefore, in some business scenarios, it is necessary for some long-running consumers to have a separate data pipeline to isolate them from other normal consumers, so that they can execute independently.
You can subscribe in CAP by adding theGroupConcurrent
parameter to do this, which is a parallel parameter meaning that multiple threads of execution are also supported.
[CapSubscribe("hello", GroupConcurrent = 2)]
public void Hello2(){
("hello 2");
(1000);
}
Setting it to 2 in Hello2 means that when a new message arrives, up to 2 threads will be executing the method at the same time.
Control and Integration Features
Start/Stop API
By default, CAP starts with the Core host process, but if you want to manually control when it starts or stops, you can do so with theIBootstrapper
interface to dynamically control it, which is convenient in some of these scenarios where a specific time is needed to process a message.
Provides APIs for starting and stopping message processing, making it easy to control the lifecycle of message processing.
[Route("~/control/start")]
public async Task<IActionResult> Start([FromServices]IBootstrapper bootstrapper){
await ();
return Ok();
}
[Route("~/control/stop")]
public async Task<IActionResult> Stop([FromServices] IBootstrapper bootstrapper){
await ();
return Ok();
}
Heterogeneous Systems Integration
Messages sent may come from third-party heterogeneous systems rather than being generated by the sender side of CAP, which supports message interfacing with heterogeneous systems.
If a heterogeneous system sends a message without passing the Header information necessary for consumption, you can use theCustomHeadersBuilder
configuration item to add.
(z => {
= (msg, sp) =>
[
new(, <ISnowflakeId>().NextId().ToString()),
new(, )
];
});
Multiple transmission support
A variety of supported message queues are supported as transports, the following transports are currently supported.
- RabbitMQ
- Kafka
- Azure Service Bus
- Amazon SQS
- NATS
- In-Memory Queue
- Redis Streams
- Apache Pulsar
Other than that, here are the community-supported transmissions, thanks to the contributors.
- ActiveMQ (@Lukas Zhang): /lukazh
- RedisMQ (@moki):/difudotnet/
- ZeroMQ (@maikebing): /maikebing/
- MQTT (@john jiang): /jinzaz/
Multiple storage support
Supports a variety of popular message storage methods, including SQL Server, MySQL, PostgreSQL, MongoDB, InMemory, and more.
All except InMemory provide support for transactional messages.
Other than that, here are the community-supported transmissions, thanks to the contributors.
- SQLite (@colinin) :/colinin/
- LiteDB (@maikebing) :/maikebing/
- SQLite & Oracle (@cocosip) :/cocosip/CAP-Extensions
- SmartSql (@xiangxiren) :/xiangxiren/
Monitoring, Diagnostics, Observability
CAP provides a Dashboard for users to quickly view messages and real-time message sending and consumption.
CAP provides a Diagnostics API for easy access to Logging, Tracing and Metrics.
CAP provides support for the OpenTelemetry Observability Standard.
The Dashboard includes the following features:
- polyglot
- Real-time monitoring and visualization
- Manual View and Retry
- Tightly integrated licensing with Core
- Service Agents
- Consul for Service Discovery
- Service Discovery with Kubernetes
- Metrics metrics support
polyglot
Provide Chinese and English language support, switch the display in the upper right corner of the interface.
Real-time monitoring and visualization
Supports real-time charting of message processing status to help monitor message production and consumption execution.
Manual View and Retry
Based on the functionality provided by the Dashboard, you can view the details of the message including the header and content content.
Successful or failed messages can be sent or consumed again manually in the interface.
The Subscriber page provides a view of all the subscription methods under the current service, grouped based on Group, making it easy for users to view and search in a unified way.
Tight integration with Core licenses
Dashboard supports seamless integration with Core's authorization mechanism.
- anonymous access
(d =>{
= true;
});
- OpenId Connect
(options => {
(DashboardAuthorizationPolicy, policy => policy
.AddAuthenticationSchemes()
.RequireAuthenticatedUser());
})
.AddAuthentication(opt => = )
.AddCookie()
.AddOpenIdConnect(options => {
...
});
(cap => {
(d => {
= DashboardAuthorizationPolicy;
});
});
Check out the sample projects to learn more.
Consul Node Discovery
CAP is decentralized design, so to view the data of other nodes without opening the other nodes of the Dashboard should support node discovery, Dashboard built-in proxy function is used to proxy the other nodes of the API to return to the data in the current node of the Dashboard display.
Data can be viewed by simply switching between nodes, resulting in a better user experience.
Supports node discovery via Consul, which has been used by most companies in the past as a configuration center or registry for distributed systems.
For more information, check outofficial document。
K8s Node Discovery
Modern applications basically have K8S as the default cluster management and service registry..K8s
package provides support for K8s node discovery.
The CAP reads all the services in a particular K8S namespace and then pings those nodes that have the Dashboard API enabled to switch over to the agent.
Standalone Dashboard
Dashboard also supports standalone deployments that do not rely on service items, eliminating the need to configure theAddCap
And use the following directly:
();
For more information, check outofficial document。
Diagnostics API
CAP for .DiagnosticSource Support is provided for a listener with the nameCapDiagnosticListener
。
You can find more information on the Find the name of the event that CAP has defined below the class.
Diagnostics provides externally available event information are:
Before | After | Exception |
---|---|---|
Before Message Persistence | After message persistence | Message persistence exceptions |
Before the message is sent to the MQ | After the message is sent to the MQ | Message to MQ Exception |
Messages are saved from MQ consumption before | After the message is saved from MQ consumption | |
Before the subscriber method executes | After the subscriber method executes | Subscriber method execution exception |
CAP for .EventSource Support is provided for counters named。
Metrics provides the following metrics:
- Posting speed per second
- Consumption rate per second
- Rate of subscriber calls per second
- Average subscriber elapsed time per second to execute
OpenTelemetry Support
Distributed tracking has a very important role in modern applications. OpenTelemetry is an open standard supporting multiple programming languages and frameworks, providing cross-platform and cross-language compatibility, and facilitating the realization of unified monitoring and tracking in heterogeneous systems.
CAP can connect the upstream triggered message source link, and then pass to the downstream execution through each service, each transmission, each storage, the whole process will be recorded and restored, forming a complete invocation chain, which is convenient for performance optimization and problem troubleshooting. In addition, the message links sent during message consumption can also be accurately connected, making the invocation chain complete and accurate.
CAP Adopted package provides support for OpenTelemetry.
((builder) => builder
.AddAspNetCoreInstrumentation()
.AddCapInstrumentation()
.AddZipkinExporter()
);
summarize
That's it, that's all the features of CAP, if you have some more thoughts and ideas you can submit an Issue to us on Github.
If you have problems using it hopefully you can also give positive feedback to help make CAP better.
If you like this project, you can give us your support by clicking Star through the link below.
Thank you for your [recommendation] if you found this post helpful.
The address for this article:/savorboard/p/
Author's Blog:Savorboard
This article was originally licensed under: Attribution - Non-Commercial Use - No Interpretation, AgreementPlain text | ProtocolLegal texts