some time ago inpulsar-client-go The community saw this oneissue:
import "/apache/pulsar-client-go/pulsar"
client, err := ({
URL: "pulsar://localhost:6650",
})
if err != nil {
(err)
}
consumer, err := ({
Topic: "persistent://public/default/mq-topic-1",
SubscriptionName: "sub-1",
Type: ,
ReceiverQueueSize: 0,
})
if err != nil {
(err)
}
// less than or equal to 0 will be set to 1000
const (
defaultReceiverQueueSize = 1000
)
if <= 0 {
= defaultReceiverQueueSize
}
He found out that manually putting the pulsar-client-go client'sReceiverQueueSize
If it is set to 0, the client will adjust it to 1000 during initialization.
if < 0 {
= defaultReceiverQueueSize
}
And if you manually modify the source code so that it can be set to 0 when it is not consumed properly, the consumer will remain in the waiting state and not get any data.
My troubleshooting revealed that Pulsar's Go client was missing aZeroQueueConsumerImplimplementation class, this class is mainly used to allow fine control of the consumption logic.
If you'd like to have tight control over message dispatching across consumers, set the consumers' receiver queue size very low (potentially even to 0 if necessary). Each consumer has a receiver queue that determines how many messages the consumer attempts to fetch at a time. For example, a receiver queue of 1000 (the default) means that the consumer attempts to process 1000 messages from the topic's backlog upon connection. Setting the receiver queue to 0 essentially means ensuring that each consumer is only doing one thing at a time.
/docs/next/cookbooks-message-queue/#client-configuration-changes
As mentioned in the official documentation, it is possible to set ReceiverQueueSize to 0; this allows the consumer to consume the data piece by piece without piling up messages in the client queue.
Client-side consumption logic
Take this opportunity to review the pulsar client's consumption logic again so that you can understand theReceiverQueueSize
and how to implement it in pulsar-client-go.ZeroQueueConsumerImpl
。
The Pulsar client's consumption model is based on a combination of push and pull:
As this diagram depicts the flow, the consumer actively sends a Flow command to the server at startup, telling the server how many messages need to be sent down to the client.
It will also use theReceiverQueueSize
parameter as the size of the internal queue in which messages placed by the client are stored.
Then after calling thereceive
function will get data directly from this queue.
Each successful consumption sends an internalAvailablePermit+1
until it is greater thanMaxReceiveQueueSize / 2
The flow command is sent to the broker again, telling the broker to send the message again.
So there's a critical event here: it's the sending of the broker'sflow
command so that a new message is sent down to the client.
I've often been asked by fellow R&D students to troubleshoot unconsumable problems before, and almost always the reason I end up locating is slow consumption, which leads to hereAvailablePermit
There is no growth, and thus no trigger for the broker to push a new message to the client.
The phenomenon seen is very slow consumption.
ZeroQueueConsumerImpl Principle
Check it out below.ZeroQueueConsumerImpl
is how to realize that a queue size of 0 can still be consumed.
When the consumer is built, the queue size is used to create a regular consumer or aZeroQueueConsumerImpl
Consumers.
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFuture<Message<T>> future = ();
if (!()) {
// We expect the message to be not in the queue yet
increaseAvailablePermits(cnx());
}
return future;
}
this isZeroQueueConsumerImpl
A rewrite of a consumer function, the key to which is theincreaseAvailablePermits(cnx());
.
void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = AVAILABLE_PERMITS_UPDATER.get(this);
}
}
}
As you can see from the source code, the logic here is to increment the AvailablePermit and request the broker to send a message when the threshold is reached.
That's because inZeroQueueConsumerImpl
The size of the queue inavailable >= getCurrentReceiverQueueSize() / 2
will always be true.
This means that every message consumed requests the broker to let it send another message, which achieves the effect of precise control over every message.
Implementation in pulsar-client-go
To implement this requirement in pulsar-client-go, I submitted aPR to solve this problem.
In fact, from the above analysis, we already know why we manually put theReceiverQueueSize
Setting it to 0 does not consume the message anymore.
The root cause is still preferred over queue 0 during initialization, resulting in no flow commands being sent to the broker so that no messages are pushed to the client and no data can be consumed.
So we still have to refer to Java'sZeroQueueConsumerImpl
Manually add at each consumptionavailablePermits
。
For this reason I have also added a new consumerzeroQueueConsumer
。
// EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0.
// Notice: only non-partitioned topic is supported.
// Default is false.
EnableZeroQueueConsumer bool
consumer, err := (ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
NackRedeliveryDelay: 1 * ,
EnableZeroQueueConsumer: true,
})
if {
= 0
}
When creating a consumer you need to specify whether to turn onZeroQueueConsumer
The ReceiverQueueSize will be manually set to 0 when it is turned on.
// The default value can be set.
private int receiverQueueSize = 1000;
In Go, you can't specify a default value when you initialize a structure like you can in Java, and since Go's int type has a value of zero, it's impossible to tell if ReceiverQueueSize=0 is user-initiated or if it's just a zero value that wasn't passed in.
That's why it's necessary to add a new parameter to manually distinguish between using theZeroQueueConsumer
。
Afterwards, when creating theconsumer
is judged only when a single partition is used for thetopic
And it's on.EnableZeroQueueConsumer
in order to createzeroQueueConsumer
。
Use the PARTITIONED_METADATA command to have the broker return the number of partitions.
func (z *zeroQueueConsumer) Receive(ctx ) (Message, error) {
if state := (); state == consumerClosed || state == consumerClosing {
("state", state).Error("Failed to ack by closing or closed consumer")
return nil, ("consumer state is closed")
}
()
defer ()
()
for {
select {
case <-:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return , nil
case <-():
return nil, ()
}
}
}
One of the key codes:()
The logic at consumption time is actually the same as Java'sZeroQueueConsumerImpl
The logic stays the same, also increasing before each piece of data is consumedavailablePermits
。
pulsar-client-go works similarly to the Java client in that it stores messages in an internal queue, so each time a message is consumed it only needs to be consumed from this queue.messageCh
Just get it in.
It is worth noting that the pulsar-client-go version of thezeroQueueConsumer
There would be no support for reading the internal queue directly.
func (z *zeroQueueConsumer) Chan() <-chan ConsumerMessage {
panic("zeroQueueConsumer cannot support Chan method")
}
will directly panic, because the direct consumption channel at the client level will not be able to help the user to actively send flow commands, so this function will have to be blocked, only the proactivereceive
Message.
A long time ago I also drew a flowchart about the consumption of pulsar client, and I'm thinking of writing another article about the principle analysis of pulsar client.
Reference Links:
- /apache/pulsar-client-go/issues/1223
- /developer/article/2307608
- /docs/next/cookbooks-message-queue/#client-configuration-changes
- /apache/pulsar-client-go/pull/1225