null

Kotlin + WebFlux + Bucket4K/Bucket4J + Lettuce. Усовершенствуем API с помощью Rate Limiting.

Введение

Rate Limiting (Ограничение скорости) - это важная стратегия ограничения доступа к API (и не только). Она ограничивает количество вызовов API, которые клиент может совершить за определенный промежуток времени. Это позволяет защитить API от чрезмерного использования, как непреднамеренного, так и злонамеренного.

В мире Java существует несколько различных готовых решений, реализующих стратегию ограничения скорости.

В этой статье мы рассмотрим одно из таких решений - популярную, надежную и производительную библиотеку Bucket4J.

Мы не будет создавать проект с нуля. Мы за основу возьмем проект, который реализовывали в рамках предыдущей статьи. Ее вы можете прочитать здесь.

Кратко напомню, что в ней мы настраивали проект в IntelliJ Idea для создания реактивного REST API приложения на Kotlin и WebFlux.

Также мы создавали демонстрационное приложение "АПИ интернет магазина обучающих курсов онлайн", используя реактивное программирование.

В этой статье, для краткости, я буду приводить лишь код, который отличается от кода, реализованного в прошлый раз. Однако, при необходимости, полный исходный код из этой статьи вы сможете найти в моем репозитории на гитхабе (здесь).

1 Добавление зависимостей

Первым делом давайте добавим в проект нужные зависимости.

Нам понадобятся следующие зависимости:

a) Bucket4k (https://github.com/ksletmoe/Bucket4k) - Kotlin-обертка для Bucket4j, которая может приостанавливать свою работу и отлично дружит с coroutines. Внутри себя также содержит код классического Bucket4J.

b) Kotlinx-coroutines-core - необходима нам для того, чтобы работать с coroutines

c) Bucket4j-redis и Lettuce-core - нужны для того, чтобы реализовать работу Bucket4j в распределенных системах

Файл build.gradle.kts со всеми нужными зависимостями выглядит так:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "3.1.4"
    id("io.spring.dependency-management") version "1.1.3"
    kotlin("jvm") version "1.8.22"
    kotlin("plugin.spring") version "1.8.22"
}

group = "com.tuneit"
version = "0.0.1-SNAPSHOT"

java {
    sourceCompatibility = JavaVersion.VERSION_17
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    implementation("org.springframework.boot:spring-boot-starter-validation:3.1.4")
    implementation("com.sletmoe.bucket4k:bucket4k:1.0.0")
    implementation("com.bucket4j:bucket4j-redis:8.2.0")
    implementation("io.lettuce:lettuce-core:6.2.6.RELEASE")
    runtimeOnly("org.postgresql:postgresql")
    runtimeOnly("org.postgresql:r2dbc-postgresql")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs += "-Xjsr305=strict"
        jvmTarget = "17"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

2 Ограничение скорости с помощью Bucket4k и поддержкой coroutines (Bucket4K + Coroutines)

Функционал Rate Limiting можно реализовать на уровне контроллера или даже эндпоинта. Однако при каждом добавлении нового контроллера и/или эндпоинта возникнет необходимость снова и снова писать однотипный повторяющийся код для подключения и настройки Bucket4J. Лучшим вариантом, который лишен вышеуказанной проблемы, является использование интерфейса Filter.

В классическом Spring им может стать OncePerRequestFilter, в реактивном Spring обычно используется WebFilter или его разновидность CoWebFilter, которая специально создана для Kotlin и поддерживает coroutines.

Так как мы используем реактивный Spring и Kotlin, то неплохой идеей будет попробовать реализовать асинхронную работу Bucket4J с помощью фичи Kotlin - coroutines.

Поэтому создадим package с названием filters и добавим в него класс "CoRateLimitFilter" со следующим содержимым:

package com.tuneit.coursesshopreactive.filters

import com.sletmoe.bucket4k.SuspendingBucket
import com.tuneit.coursesshopreactive.model.RateLimitException
import io.github.bucket4j.Bandwidth
import io.github.bucket4j.Refill
import org.springframework.http.HttpMethod
import org.springframework.stereotype.Component
import org.springframework.web.util.pattern.PathPattern
import org.springframework.web.util.pattern.PathPatternParser
import java.time.Duration
import kotlinx.coroutines.*
import org.springframework.core.Ordered
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.web.server.*
import java.security.MessageDigest
import kotlin.collections.HashMap
import kotlin.time.DurationUnit
import kotlin.time.toDuration


@Component
class CoRateLimitFilter : CoWebFilter(), Ordered {
    //A Kotlin wrapper around Bucket4j which suspends and plays nicely with coroutines.
    //https://github.com/ksletmoe/Bucket4k
    
    val localBuckets = HashMap<String, SuspendingBucket>()

    private fun createSuspendingBucket() : SuspendingBucket = SuspendingBucket.build {
        addLimit(Bandwidth.classic(50, Refill.intervally(50, Duration.ofHours(1))))
        addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))
    }

    private fun getBucketKeyForRemoteAddr(request: ServerHttpRequest): String {
        val ipFromHeader: String? = request.headers.getFirst("X-FORWARDED-FOR")
        val ip = if (ipFromHeader.isNullOrBlank()) request.remoteAddress.toString() else ipFromHeader
        return MessageDigest.getInstance("SHA-256")
                            .digest(ip.toByteArray())
                            .fold(StringBuilder()) { sb, it -> sb.append("%02x".format(it)) }.toString()
    }
    
    override suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain) {
        val request = exchange.request
        val response = exchange.response
        val bucketKey = getBucketKeyForRemoteAddr(request)
        val localBucket = localBuckets.getOrPut(bucketKey) { createSuspendingBucket() }
        if(urlMatches(request)) {
            val isConsumed = CoroutineScope(Dispatchers.IO).async {
                localBucket.tryConsume(1, 1.toDuration(DurationUnit.SECONDS))
            }.await()
            val bucketInfo = localBucket.toString().split("[", "]")[1].split(", ")
            val availableTokensInLongPeriod = bucketInfo[1].toInt().coerceAtLeast(0)
            val availableTokensInShortPeriod = bucketInfo[4].toInt().coerceAtLeast(0)
                if(isConsumed) {
                response.headers.set("X-Rate-Limit-Remaining",
                    "$availableTokensInShortPeriod/$availableTokensInLongPeriod"
                )
                return chain.filter(exchange)
            }
            throw RateLimitException("Too many requests")
        }
        return chain.filter(exchange)
    }

    val pathsToFilter: List<PathPattern> =
        listOf(PathPatternParser.defaultInstance.parse("/courses"),
            PathPatternParser.defaultInstance.parse("/courses/{id}"))

    private fun urlMatches(request: ServerHttpRequest) : Boolean {
        return pathsToFilter.any { it.matches(request.path.pathWithinApplication())}
                && request.method.matches(HttpMethod.GET.name())
    }

    override fun getOrder(): Int {
        return 1;
    }
}

Давайте рассмотрим основные моменты в реализации фильтра.

val localBuckets = HashMap<String, SuspendingBucket>()

Здесь мы объявляем карту, где будем хранить все Bucket (ведра), которые будут создаваться в ходе работы приложения.

Информацию о Buckets можно хранить также в сессии сервера, использовать внешние хранилища, такие как реляционные базы данных, или же другие продукты, предназначенные для реализации распределенных систем: Infinispan, Hazelcast, Coherence, Ignite, Redis.

Немного позже в этой статье мы рассмотрим как сконфигурировать Bucket4J для работы в распределенной системе с помощью Redis библиотеки - Lettuce.

Возвращаемся к фильтру.

Далее идет функция

private fun createSuspendingBucket() : SuspendingBucket = SuspendingBucket.build {
   addLimit(Bandwidth.classic(50, Refill.intervally(50, Duration.ofHours(1))))
   addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))
}

В ней создается и возвращается SuspendingBucket - Bucket, который хорошо работает с coroutines.

При создании Bucket мы указываем его лимиты и стратегию пополнения.

Перед тем как пойти дальше, хочется (и даже возможно нужно ;) ) немного коснуться теории.

Rate Limiting может быть реализовано с помощью множества различных алгоритмов: Token bucket, Leaky bucket, Fixed window counter, Sliding window log и др.

В сердце библиотеки Bucket4J лежит алгоритм Token bucket. Здесь, по ссылке, немного wiki про это (https://ru.wikipedia.org/wiki/Алгоритм_текущего_ведра)

Основы этого алгоритма понять несложно. Предположим, что у нас есть ведро (Bucket), в которое можно поместить какое-то максимальное количеством жетонов (токенов). Каждый раз, когда клиент (или Потребитель) хочет обратиться к сервису или запросить ресурс, он должен вынуть из ведра один или несколько жетонов. Потребитель (Consumer) может воспользоваться сервисом только в том случае, если он может извлечь необходимое количество жетонов. Если в ведре нет необходимого количества жетонов, ему нужно подождать, пока в ведре не будет достаточного количества.

Потребитель, тот кто пользуется нашим сервисом, он забирает жетоны.. А кто же их тогда кладет в ведро? В алгоритме существует также Пополнитель (Refiller), который периодически создает новые жетоны и кладет их в ведро.

Библиотека Bucket4J поддерживает два типа Пополнителя:

Refill.intervally(50, Duration.ofHours(1))

"Интервальный" Пополнитель "intervally" ждет, пока пройдет заданное количество времени, и затем кладет в ведро все жетоны сразу. В нашем примере он добавляет по 50 жетонов каждый час.

Refill.greedy(5, Duration.ofMinutes(1))

"Жадный" Пополнитель "greedy" добавляет жетоны более торопливо. В данном примере Refiller делит одну минуту на пять периодов. Затем в каждый из этих периодов он помещает в ведро по одному жетону. Другими словами он кладет в ведро по одному жетону каждые 12 секунд.

Важно еще раз отметить, что ведра имеют максимальную емкость. Поэтому Refiller кладет жетоны в ведро только до тех пор, пока не упрется в максимум. Если ведро заполнено (количество жетонов = вместимости), он больше не кладет в него жетоны.

Емкость задается при добавлении лимита:

addLimit(Bandwidth.classic(50, Refill.intervally(50, Duration.ofHours(1))))

Таким образом, максимальная емкость ведра в нашем случае составляет 50 жетонов.

И если эти жетоны израсходовать в течении часа, то нужно будет ждать окончание этого часа, чтобы заполучить новые 50 жетонов.

А зачем необходим второй лимит?

addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))

Вторым лимитом мы ограничиваем потребление жетонов в более коротком промежутке времени. Так, в течении минуты мы не можем потребить сразу более 5 жетонов.

Если соединить эти два лимита воедино, то получим: что на час нам дается "потолок" в размере 50 жетонов, которые пополняются единожды каждый час (вновь до 50-ти), при этом в течении минуты максимум мы единоразово может потребить только 5 жетонов и не более. Лимит в 5 жетонов/мин пополняется каждые 12 секунд по одному жетону. Все это значит, что клиенты нашего АПИ смогут обратиться к нему не более 50-ти раз в течении часа (затем им нужно будет ждать окончание этого часа) и смогут совершить не более 5-ти одновременных вызовов в течении минуты или не более 5 последовательных вызовов в течении 12 секунд, а потом им придется ждать каждые 12 секунд, чтобы сделать еще одни вызов (уверен суть вы поняли).

P.S. теорию я кстати взял из классной статьи вот здесь (https://golb.hplar.ch/2019/08/rate-limit-bucket4j.html)

Разобравшись с теорией и лимитами, продолжим разбор исходного кода нашего фильтра.

Рассмотрим следующую функцию:

private fun getBucketKeyForRemoteAddr(request: ServerHttpRequest): String {
        val ipFromHeader: String? = request.headers.getFirst("X-FORWARDED-FOR")
        val ip = if (ipFromHeader.isNullOrBlank()) request.remoteAddress.toString() else ipFromHeader
        return MessageDigest.getInstance("SHA-256")
                            .digest(ip.toByteArray())
                            .fold(StringBuilder()) { sb, it -> sb.append("%02x".format(it)) }.toString()
    }

Ограничения в API часто делаются по IP-адресу или более специфичным для бизнеса способом, например, с помощью ключей API или маркеров доступа.

В нашем примере, мы делаем ограничение по IP-адресу. Для каждого уникального пользователя API (уникальность определяется по ip), мы создаем и храним свой Bucket.

Функция выше определяет с какого ip приходит запрос к АПИ, вычисляет и возвращает hash от этого ip (чтобы не хранить ip в явном виде; забота о личных данных как говорится :)). Этот hash мы используем в качестве ключа для карты, в которой храним все наши Buckets.

Далее идет главная функция фильтра - filter (), где разыгрывается основное действие:

по данным http-запроса, определяем ip адрес пользователя АПИ. По этому адресу вычисляем ключ для карты. Далее смотрим есть ли в карте данные по этому ключу. Если нет, то создаем новый Bucket и помещаем в карту, если данные уже есть - то достаем существующий Bucket из карты.

Затем смотрим соответствует ли адрес запроса определенному нами шаблону:

    val pathsToFilter: List<PathPattern> =
        listOf(PathPatternParser.defaultInstance.parse("/courses"),
            PathPatternParser.defaultInstance.parse("/courses/{id}"))

    private fun urlMatches(request: ServerHttpRequest) : Boolean {
        return pathsToFilter.any { it.matches(request.path.pathWithinApplication())}
                && request.method.matches(HttpMethod.GET.name())
    }

В примере мы сделали так, что ограничения Rate Limiting накладываются только на GET-эндпоинты courses и courses/{id}

Если адрес запроса соответствует шаблону, то выполняем все необходимые мероприятия по Rate Limiting, иначе - фильтр ничего не делает и просто передает эстафету дальше по цепи.

  val isConsumed = CoroutineScope(Dispatchers.IO).async {
                localBucket.tryConsume(1, 1.toDuration(DurationUnit.SECONDS))
            }.await()

Здесь мы с помощью coroutine асинхронно пытаемся вытащить из Bucket один жетон.

Если попытка успешная, то есть isConsumed = true, то мы разрешаем фильтру передать управление дальше по цепи, то есть разрешаем обычный цикл запроса к АПИ.

Пользователь при этом получит желаемый ответ на свой запрос.

При этом в ответ мы добавляем заголовок X-Rate-Limit-Remaining, в котором содержится кол-во оставшихся доступных токенов.

Так как почему-то в реализации Bucket4K не работает нормально расчет оставшегося кол-ва жетонов, мне пришлось доставать эти данные из мета-данных Bucket:

   val bucketInfo = localBucket.toString().split("[", "]")[1].split(", ")
            val availableTokensInLongPeriod = bucketInfo[1].toInt().coerceAtLeast(0)
            val availableTokensInShortPeriod = bucketInfo[4].toInt().coerceAtLeast(0)

Если же в Bucket недостаточно жетонов, то мы выбрасываем ошибку RateLimitException и прерываем цикл выполнения запроса к АПИ. Пользователь увидит ошибку: "Слишком много запросов".

Класс ошибки RateLimitException находится в файле Errors.kt и реализован следующим образом:

@ResponseStatus(HttpStatus.TOO_MANY_REQUESTS)
data class RateLimitException(val msg: String) : RuntimeException(msg)

Вот что видит пользователь API, когда жетонов в ведре достаточно и доступ разрешен:

А вот, что он видит когда все жетоны израсходованы:

3 Ограничение скорости с помощью Bucket4J в распределенной системе (Bucket4J + WebFlux + Redis Lettuce)

В предыдущем примере, информация о Buckets хранилась в памяти приложения. Это неплохо когда кол-во пользователей АПИ небольшое и нагрузки невелики. Однако обычно в реальных приложения дела обстоят с точностью наоборот. Для компенсации нагрузки и обеспечения стабильности работы сервисов, в таких случаях, обычно одновременно работают несколько экземпляров приложения на разных серверах, а балансировщик равномерно распределяет нагрузку между этими серверами. Такую схему работы обычно называют распределенной или distributed. Локальное хранение Buckets в памяти каждого из экземпляров приложения будет ошибочным при такой схеме, ведь нам нужно чтобы все экземпляры имели общее хранилище Buckets, а не каждый свое. Другими словами нам нужен единый лимит токенов для всех экземпляров приложения в распределенной системе. Как упоминалось в статье выше, к счастью, Bucket4J поддерживает разные продукты для работы в распределенных средах. Мне как то по душе больше Redis и поэтому в примере я решил использовать его, а именно одну из библиотек на его основе - Lettuce. Эта библиотека прекрасно дружит с WebFlux, асинхронна, поэтому я выбрал ее.

Однако есть и ложка дегтя во всей этой истории.. как оказалось Bucket4K пока еще не поддерживает Redis... (ну или я не смог разобраться, но все же кажется что не поддерживает). Поэтому придется попрощаться с coroutines... и вернуться к webflux да и только. Но не беда! Как оказалось, после сравнения производительности Bucket4K + coroutines vs Bucket4J + WebFlux... оказалось, что последний союз работает быстрее, а иногда и в 10-ки раз быстрее.

И так приступим!

Для начала, запустим экземпляр Redis в докере (в терминале: docker-compose up -d).

docker-compose.yml:

version: '3.8'

services:
  postgres:
    container_name: "reactive-postgres"
    image: postgres
    ports:
      - "127.0.0.1:5432:5432"
    environment:
      POSTGRES_USER: demo
      POSTGRES_PASSWORD: demo
      POSTGRES_DB: coursesshopreactive

  redis-local:
    container_name: "reactive-redis"
    image: redis
    ports:
      - "6379:6379"   

Готово. Супер.

Далее, надо в файле application.yml добавить несколько строк, связанных с Redis:

spring:
 r2dbc:
  url: "r2dbc:postgresql://localhost:5432/coursesshopreactive"
  username: demo
  password: demo
 data:
  redis:
   host: localhost
   port: 6379

server:
  port: 8080
  error:
   include-message: always

И наконец, нужно создать конфигурационный класс, посвященный Lettuce.

Для этого, создадим package "configuration" и в нем создадим класс "LettuceConfig":

package com.tuneit.coursesshopreactive.configuration;

import io.github.bucket4j.distributed.ExpirationAfterWriteStrategy
import io.github.bucket4j.redis.lettuce.cas.LettuceBasedProxyManager
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Duration

@Configuration
class LettuceConfig (@Value("\${spring.data.redis.host}") val host : String,
                     @Value("\${spring.data.redis.port}") val port: Int) {
    
    @Bean
    fun redisClient(): RedisClient {
        return RedisClient.create(RedisURI.builder().withHost(host).withPort(port).build())
    }

    @Bean
    fun lettuceProxyManager(): LettuceBasedProxyManager {
        return LettuceBasedProxyManager.builderFor(redisClient())
                                       .withExpirationStrategy(ExpirationAfterWriteStrategy
                                       .basedOnTimeForRefillingBucketUpToMax(Duration.ofSeconds(10)))
                                       .build()
    }
}

Этот класс я не буду разбирать подробнее, он вполне стандартный и все нюансы настройки RedisClient и LettuceBasedProxyManager вы сможете легко найти в интернете сами.

Поэтому, продолжим.

Как вы могли наверное догадаться, тот CoRateLimitFilter, который мы использовали в предыдущем кейсе, нам не подходит. Во-первых, в текущем кейсе мы уже не можем использовать coroutines и, во-вторых, нам нужно реализовать взаимодействие с Lettuce.

Вместо CoRateLimitFilter, мы напишем новый другой фильтр.

В package "filters" добавим класс "RateLimitFilter":

package com.tuneit.coursesshopreactive.filters

import com.tuneit.coursesshopreactive.model.RateLimitException
import io.github.bucket4j.Bandwidth
import io.github.bucket4j.BucketConfiguration
import io.github.bucket4j.ConsumptionProbe
import io.github.bucket4j.Refill
import io.github.bucket4j.distributed.AsyncBucketProxy
import io.github.bucket4j.redis.lettuce.cas.LettuceBasedProxyManager
import org.springframework.boot.web.reactive.filter.OrderedWebFilter
import org.springframework.http.HttpMethod
import org.springframework.http.server.reactive.ServerHttpRequest
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilterChain
import org.springframework.web.util.pattern.PathPattern
import org.springframework.web.util.pattern.PathPatternParser
import reactor.core.publisher.Mono
import java.security.MessageDigest
import java.time.Duration
import java.util.concurrent.TimeUnit


@Component
class RateLimitFilter (val lettuceProxyManager: LettuceBasedProxyManager): OrderedWebFilter {

    private fun getBucketConfig(): BucketConfiguration {
        val conf = BucketConfiguration.builder()
        conf.addLimit(Bandwidth.classic(50, Refill.intervally(50, Duration.ofHours(1))))
        conf.addLimit(Bandwidth.classic(5, Refill.greedy(5, Duration.ofMinutes(1))))
        return conf.build()
    }

    private fun getBucketKeyForRemoteAddr(request: ServerHttpRequest): ByteArray {
        val ipFromHeader: String? = request.headers.getFirst("X-FORWARDED-FOR")
        val ip = if (ipFromHeader.isNullOrBlank()) request.remoteAddress.toString() else ipFromHeader
        return MessageDigest.getInstance("SHA-256").digest(ip.toByteArray())
    }

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val request = exchange.request
        val bucketKey = getBucketKeyForRemoteAddr(request)
        val bucket: AsyncBucketProxy = lettuceProxyManager.asAsync()
                                                         .builder()
                                                         .build(bucketKey, getBucketConfig())
        if(urlMatches(request)) {
          return Mono
                .fromFuture(bucket.tryConsumeAndReturnRemaining(1))
                .flatMap { handleConsumptionProbe(exchange, chain, it) }
        }
        return chain.filter(exchange)
    }

    val pathsToFilter: List<PathPattern> =
        listOf(PathPatternParser.defaultInstance.parse("/courses"),
               PathPatternParser.defaultInstance.parse("/courses/{id}"))
    
    private fun urlMatches(request: ServerHttpRequest) : Boolean {
        return pathsToFilter.any { it.matches(request.path.pathWithinApplication())}
               && request.method.matches(HttpMethod.GET.name())
    }

    private fun handleConsumptionProbe(exchange: ServerWebExchange,
                                       chain: WebFilterChain,
                                       probe: ConsumptionProbe) : Mono<Void>  {
        val response = exchange.response
        if(probe.isConsumed) {
           response.headers.set("X-Rate-Limit-Remaining", probe.remainingTokens.toString())
           return chain.filter(exchange)
        }
        response.headers.set("X-Rate-Limit-Retry-After-Seconds",
                             TimeUnit.NANOSECONDS.toSeconds(probe.nanosToWaitForRefill).toString())
        return Mono.error(RateLimitException("Too many requests"))
    }

    override fun getOrder(): Int {
        return 1;
    }
}

В нем реализован практически аналогичный функционал, что и в предыдущем примере. За тем исключением, что в этот раз данные о Buckets хранятся и берутся из Redis.

val bucket: AsyncBucketProxy = lettuceProxyManager.asAsync()
                                                         .builder()
                                                         .build(bucketKey, getBucketConfig())

А также то, что асинхронность вместо coroutines достигается с помощью реактивного WebFlux.

  return Mono
                .fromFuture(bucket.tryConsumeAndReturnRemaining(1))
                .flatMap { handleConsumptionProbe(exchange, chain, it) }

Ну и в данном случае у нас нормально работают подсчеты оставшегося кол-ва токенов и есть возможность вычислить кол-во секунд до ближайшего заполнения.

   response.headers.set("X-Rate-Limit-Remaining", probe.remainingTokens.toString())

   response.headers.set("X-Rate-Limit-Retry-After-Seconds",
                             TimeUnit.NANOSECONDS.toSeconds(probe.nanosToWaitForRefill).toString())

Вот что увидит пользователь API, когда жетонов в ведре достаточно и доступ разрешен:

И вот, что он увидит когда все жетоны будут израсходованы:

P.S. Также приведу итоговую структуру проекта, чтобы вы могли сравнить и убедиться, что все нужные классы у вас на месте:

Заключение (которое мне почти помог написать ChatGPT)

Bucket4j является мощным и универсальным инструментом для реализации стратегии Ограничения скорости в Java/Kotlin-приложениях. Независимо от того, стремитесь ли вы повысить безопасность, защитить ресурсы или обеспечить сбалансированное использование вашего приложения, Bucket4j предоставляет надежное решение. Его простота, гибкость и применимость в реальных условиях делают его ценным дополнением к инструментарию Java/Kotlin-разработчиков, стремящихся оптимизировать производительность и надежность своих приложений. Используя Bucket4j, разработчики могут достичь гармоничного баланса между эффективной обработкой запросов, стабильностью системы и улучшенным пользовательским опытом.

Спасибо за чтение! Оставайтесь с нами, stay tune-it :)