Login Form

Supongamos que tenemos un servicio A que recibe unos 5 millones de requests al día lo cual da en promedio un total de 3,472 requests por minuto. El servicio, en ciertos casos, necesita llamar un servicio B para obtener parte de la información que necesita para generar la respuesta adecuada pero dicho servicio no soporta tanta carga y tiene un extraño requerimiento de no permitir, en un momento dado, más de 20 requests simultáneos por parte del servicio A, es decir, en un momento dado, el servicio A no debe de tener más de 20 conexiones abiertas esperando una respuesta del servicio B.

Para complicar un poco más las cosas, el servicio A corre en la nube y es auto-escalable por lo que podríamos llegar a tener varias instancias corriendo en un momento dado, todas ellas haciendo peticiones al servicio B.

 

La tarea entonces, es encontrar una manera de asegurarnos que, en un momento dado, la suma de las peticiones hechas al servicio B por todas las instancias del servicio A no pasen de 20. A las técnicas que resuelven este tipo de problemas, en inglés, se les conoce como "rate limiters", "metering" o "throttle mechanisms" y son bastante comunes en servicios que manejan un gran volumen de usuarios concurrentes siendo utilizadas por lo general por grandes empresas para proteger sus back-ends o limitar el uso de sus servicios entre otras posibles razones.

 

En este artículo describo tres variantes de una solución utilizando colas junto con una pequeña implementación de la misma en java. La propuesta, a alto nivel, consiste en un micro-servicio que funja como intermediario entre los servicios AB regulando el tráfico.

 

 El siguiente diagrama ilustra el concepto poniendo como ejemplo 3 instancias del servicio A

 

 

 

 

 

 

El servicio (ThrottlingService) es un microservicio REST que expone un endpoint al cuál debe dirigir sus peticiones el servicio A. Actuando como puente, llama al servicio B y regresa la respuesta al servicio A.  En realidad se exponen tres endpoints, uno por cada variante del mecanismo específico para regular el tráfico. A continuación se listan los mecanismos ordenados del más simple al más complejo:

 

  • Utilizando un semáforo
  • Utilizando un ThreadPoolExecutor de tamaño fijo
  • Utilizando un Broker soportando procesamiento distribuido

 

Antes de comenzar con la explicación de cada uno, se presenta un pequeño diagrama de clases para que el código de ejemplo mostrado más adelante tenga sentido. 

 

 

Como se puede ver, se tiene una interfaz RestThrottleService que representa los endpoints expuestos por el servicio junto con su implementación (RestThrottleServiceImpl). La capa rest tiene dependencia con la interfaz ThrottleServiceThrottleService define el servicio que regula el tráfico y sólo expone un método throttle para atender los requests, llamar al servicio B y regresar el resultado. Se tienen tres implementaciones, una por cada mecanismo: SemaphoreThrottleServiceImpl para la regulación por medio de un semáforo, ThreadPoolThrottleServiceImpl que utiliza un ThreadPool de tamaño fijo y DistributedThrottleServiceImpl que utiliza un broker.

  

Para simplificar las cosas, el parámetro de entrada y el resultado son una cadena y por supuesto, en la implementación no estamos llamando ningún servicio B, simplemente simulamos que hacemos una llamada a través de la clase Command implementando un delay aleatorio.

 

Semáforo

Este mecanismo es el más simple de todos ya que utiliza la clase Sempahore provista por el jdk. Esta clase implementa un mecanismo de semáforo permitiendo especificar un número dado de permisos y actua a su vez como un manejador de colas que apila peticiones de threads que intentan adquirir y liberar permisos. Cada invocación al método throttle, intenta primero adquirir un permiso del semáforo para después llamar a la clase Command que simula la llamada al servicio externo. Cada vez que se adquiere, el número de permisos disponibles decrece y cuando la llamada a la clase Command termina, se libera el permiso adquirido incrementado el total de permisos disponibles. Cuando ya no hay permisos disponibles y un thread intenta adquirirlo, quedará bloqueado hasta que algún thread en ejecución libere su permiso o pase un determinado tiempo.  

 

A continuación se muestra la implementación de esta clase:

 

@Log4j2
@Component("semaphoreThrottlingService")
public class SemaphoreThrottleServiceImpl implements ThrottleService {

    @Autowired
    private Command command;
    
    private Semaphore semaphore = new Semaphore(Constants.MAX_CONCURRENT_OPERATIONS);
    
    @Override
    public String throttle(String param) {

        try {
            semaphore.tryAcquire(10, TimeUnit.SECONDS);
            log.info("Entering request with param {}", param);
            String response = command.test(param);
            log.info("Exiting request with param {}", param);
            return response;
        } catch (InterruptedException e) {
            throw new ThrottleServiceException("Test");
        } finally {
            semaphore.release();
        }
        
    }
}

 

 

La desventaja de este método es que el número de permisos disponibles se configura en el momento de instanciación del semáforo por lo que para cambiarlo en tiempo de ejecución, habría que idear algún mecanismo para reemplazar la instancia usada pero en definitiva es efectivo y bastante simple de implementar.

 

 

 ThreadPoolExecutor de tamaño fijo

 

Este método se basa en un ThreadPoolExecutor de tamaño fijo. A la hora de instanciar la clase se define el número máximo de threads que se quiere manejar de manera concurrente. El ThreadPoolExecutor puede recibir un número ilimitado de tareas a ejecutar pero, en un momento dado, no tendrá más threads corriendo que el máximo especificado actuando al mismo tiempo como cola y semáforo. A diferencia del semáforo, no se necesita adquirir y liberar permisos, simplemente se envía la tarea a ejecutar al ThreadPoolExecutor y él se encarga de su administración. 

 

 

 

 

        

@Log4j2
@Component("threadThrottlingService")
public class ThreadPoolThrottleServiceImpl implements ThrottleService {

    // Instantiate the ExecutorService with the maximum number of concurrent tasks desired
    private static ExecutorService fixedExecutorServie = Executors.newFixedThreadPool(Constants.MAX_CONCURRENT_OPERATIONS);
    
    @Autowired
    private Command command;
    
    
    @Override
    public String throttle(String param) {
        Task task = new Task(param);
        
        // Submit a task. If the maximum number of tasks has been reached, it will be queued. 
        Future futureTask = fixedExecutorServie.submit(task);
            try {
                return futureTask.get(2, TimeUnit.MINUTES);
            } catch (InterruptedException | ExecutionException
                    | TimeoutException e) {
                futureTask.cancel(true);
                log.error("Error for task with param {}", param, e);
                throw new ThrottleServiceException(e);
            }
    }
    
    
    private class Task implements Callable{

        private Map<String, String> context = MapUtils.EMPTY_MAP;
        private String message;
        
        public Task(String message) {
            this.message = message;
            context = ThreadContext.getImmutableContext();
        }
        
        @Override
        public String call() {
            ThreadContext.putAll(context);
            log.info("Entering request with param {}", message);
            String response = command.test(message);
            log.info("Finishing request with result {}", message);
            return response;
        }
    }

}

 

 La ventaja de este método es que no nos tenemos que preocupar por adquirir y liberar permisos de manera explícita, la desventaja es la misma que la del semáforo, habría que idear alguna manera de reemplazar la instancia para cambiar en tiempo de ejecución el número de threads concurrentes aceptados.

 

Broker con procesamiento distribuido

 

Esta es la propuesta más sofisticada en este artículo y la más compleja de implementar. El servicio a la hora de recibir un request, publica a través de la clase Producer un mensaje en el broker el cual es consumido por la clase Consumer que invoca la clase Command para llamar el servicio externo. Una vez recibido el mensaje, regresa a través del broker la respuesta a la clase Producer. Este patrón se conoce como RPC por sus siglas en inglés (Remote Procedure Call ) el cual consiste en invocar una función en un programa remoto y recibir un resultado. En la aplicación definimos el número máximo de consumidores procesando mensajes del broker de manera simultánea. El broker actúa como manejador de colas, y la aplicación, controlando el número concurrente de consumidores, actúa como semáforo. Mientras haya consumidores libres, los mensajes que lleguen al broker serán procesados inmediatamente pero, si todos los consumidores están ocupados, los mensajes que lleguen al broker serán puestos en espera hasta que se libere algún consumidor. 

En nuestro caso el consumidor está dentro del mismo servicio pero en realidad podría estar donde sea y esa es la principal ventaja sobre las otras dos opciones: Soporte del procesamiento distribuido.

Mediante el mecanismo de publicación/subscripción, la realización de la llamada al servicio externo no necesariamente tiene que estar dentro del mismo servicio sino que podría existir en una aplicación externa. Más aún, si tenemos varias instancias de nuestro ThrottleService corriendo, podemos distribuir la carga a través de ellos (nótese que no se trata de la carga distribuida a través de un balanceador de carga enfrente de los servicios sino, una vez que la llamada ya llegó al sistema, podemos distribuir las llamadas al servicio externo a través de las distintas instancias que tenemos corriendo). 

Otra de las ventajas que tiene es que, al menos para este ejemplo que utiliza RabbitMQ y la implementación de AMQP de Spring para interactuar con él, el número de consumidores se puede alterar en tiempo de ejecución. De igual manera, el TTL de los mensajes se le delega al broker y no necesitamos manejarlo de manera explícita en el servicio como en los dos ejemplos anteriores. 

 

A continuación se presenta un diagrama de secuencia de lo explicado anteriormente:

 

 

 

Como ya mencionamos, nuestra implementación usa RabbitMQ como broker y la implementación de Spring del protocolo AMQP la cuál ofrece de manera transparente para nosotros la implementación del patrón RPC por lo que el código resulta muy sencillo. A continuación se muestra el código de las clases involucradas en esto. Ya que el ejemplo utiliza Spring Boot, la configuración del número máximo de consumidores concurrentes se tiene en el archivo application.properties en la propiedad spring.rabbitmq.listener.simple.concurrency.  Este valor se podría cambiar en tiempo de ejecución accediendo al método setConcurrentConsumers del MessageListenerContainer utilizado.

 

 DistributedThrottleServiceImpl

@Log4j2
@Component("distributedThrottlingService")
public class DistributedThrottleServiceImpl implements ThrottleService {


    @Autowired
    private Producer producer;
    
    @Value("${queue}")
    String queue;
    
    @Override
    public String throttle(String param) {
        log.info("Entering request with param {}", param);
        String response = producer.sendTo(queue, param);
        log.info("Exiting request with param {}", param);
        return response;
    }
    
}

 

 Producer

@Component
@Log4j2
public class Producer {

    @Autowired(required=false)
    private RabbitTemplate rabbitTemplate;
    
    public String sendTo(String routingkey,String message){
        String response = (String)this.rabbitTemplate.convertSendAndReceive(routingkey,message);
        log.info("Sent-> ...{}, Received: {}", message, response);
        return response;
    }    
}

 Consumer

@Component
@Log4j2
public class Consumer {

    @Autowired
    private Command command;
    
    @RabbitListener(queues="${queue}")
    public String handler(String message){
        log.info("Consuming: " + message);
        return command.test(message); 
    }
}

 

 

Detalles de implementación

  • El proyecto está desarrollado en Java 8.
  • Utiliza Spring Boot con un servidor Tomcat incrustado escuchando en el puerto 8080.
  • Para el servicio REST, se utiliza la implementación JAX-RS de Apache CXF.
  • Como broker utilizamos RabbitMQ y la implementación del protocolo AMQP de Spring para accederlo por lo que debe estar instalado y corriendo ya que la aplicación apunta por default a localhost. Si tienen Docker, ejecuten el siguiente comando: "docker run -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management".
  • Para poder monitorear el número de threads en ejecución en un momento dado, utilizamos Hystrix para envolver las llamadas a la clase Command y exponemos las métricas del thread pool del comando Hystrix mediante el dashboard que provee el framework de Netflix.
  • El dashboard de Hystrix se encuentra accesible en http://localhost:8080/hystrix/monitor?stream=http%3A%2F%2Flocalhost%3A8080%2Fhystrix.stream%2F
  • Los url para probar los distintos mecanismos son: http://localhost:8080/throttle-service/throttle/semaphore/param , http://localhost:8080/throttle-service/throttle/distributed/param y http://localhost:8080/throttle-service/throttle/threaded/param (Se puede sustituir param por cualquier cadena). 
  • Se muestra una implementación de un filtro que asegura que se tenga un correlationId para rastrear en los logs mensajes asociados a un request en particular. 
  • Para construir y correr la aplicación, ejecutar "mvn clean package" seguido por "java -jar throttle-demo-0.0.1-SNAPSHOT.jar "
  • La aplicación, por default, tiene deshabilitado el soporte del mecanismo que utiliza RabbitMQ. Para habilitarlo, se debe activar el perfil "rabbit", por ejemplo: java -jar -Dspring.profiles.active=rabbit throttle-demo-0.0.1-SNAPSHOT.jar 
  • La aplicación también provee una tarea que se ejecuta cada segundo para probar el mecanismo distribuido bajo el perfil "scheduler". Para usarla ejecutar el siguiente comando: java -jar -Dspring.profiles.active=scheduler,rabbit throttle-demo-0.0.1-SNAPSHOT.jar 

La siguiente imagen muestra la aplicación corriendo con una configuración de 2 threads máximo en el mecanismo de semáforo recibiendo 5 requests simultáneos. Como se puede apreciar, el ThreadPool tiene una cantidad de 2 en MaxActive y Active mientras que el tamaño de la cola es de 5 (los 5 requests recibidos, 2 de los cuales están siendo procesados mientras los otros 3 están en espera):

 

 

 

 

 

 

El código fuente se encuentra en este repositorio git.

For English speaking people, you can download the source code here.

 

 

Add comment


Security code
Refresh

contacts Contactanos

 

bugs Reportar bugs

about Acerca de www.tecnohobby.net

Go to top