Package io.mats3.util

Class MatsFuturizer

java.lang.Object
io.mats3.util.MatsFuturizer
All Implemented Interfaces:
AutoCloseable

public class MatsFuturizer extends Object implements AutoCloseable
An instance of this class acts as a bridge service between the synchronous world of e.g. a HTTP request, and the asynchronous world of Mats. In a given project, you typically create a single instance of this class upon startup, and employ it for all such scenarios. In short, in a HTTP service handler, you initialize a Mats flow using singletonFuturizer.futurizeNonessential(...) (or futurize(...) for full configurability), specifying which Mats Endpoint to invoke and the request DTO instance, and then you get a CompletableFuture in return. This future will complete once the invoked Mats Endpoint replies.

It is extremely important to understand that this is NOT how you compose multiple Mats Endpoints together! This is ONLY supposed to be used when you are in a synchronous context (e.g. in a Servlet, or a Spring @RequestMapping), and want to interact with the Mats fabric of services.

A question you should ask yourself, is how this works in a multi-node setup? For a Mats flow, it does not matter which node a given stage of a MatsEndpoint is performed, as it is by design totally stateless wrt. the executing node, as all state resides in the message. However, for a synchronous situation as in a HTTP request, it definitely matters that the final reply, the one that should complete the returned future, comes in on the same node that issued the request, as this is where the CompletableFuture instance is, and where the waiting TCP connection is connected! The trick here is that the final reply is specified to come in on a node-specific Topic, i.e. it literally has the node name (default being the hostname) as a part of the MatsEndpoint name, and it is a SubscriptionTerminator.

Another aspect to understand, is that while Mats "guarantees" that a submitted initiation will flow through the Mats endpoints, no matter what happens with the processing nodes (unless you employ NonPersistent messaging, which futurizeNonessential(..) does!), nothing can be guaranteed wrt. the completion of the future: This is stateful processing. The node where the MatsFuturizer initiation is performed can crash right after the message has been put on the Mats fabric, and hence the CompletableFuture vanishes along with everything else on that node. The mats flow is however already in motion, and will be executed - but when the Reply comes in on the node-specific Topic, there is no longer any corresponding CompletableFuture to complete. This is also why you should not compose Mats endpoints using this familiar feeling that a CompletableFuture probably gives you: While a multi-stage MatsEndpoint is asynchronous, resilient and highly available and each stage is transactionally performed, with retries and all the goodness that comes with a message oriented architecture, once you rely on a CompletableFuture, you are in a synchronous world where a power outage or a reboot can stop the processing midway. Thus, the MatsFuturizer should always just be employed out the very outer edge facing the actual client - any other processing should be performed using MatsEndpoints, and composition of MatsEndpoints should be done using multi-stage MatsEndpoints.

Note that in the case of pure "GET-style" requests where information is only retrieved and no state in the total system is changed, everything is a bit more relaxed: If a processing fails, the worst thing that happens is a slightly annoyed user. But if this was an "add order" or "move money" instruction from the user, a mid-processing failure is rather bad and could require human intervention to clean up. Thus, the futurizeNonessential(..) method should only be employed for such "GET-style" requests, and any other potentially state changing operations must employ the generic futurize(..) method.

  • Field Details

    • _matsFactory

      protected final MatsFactory _matsFactory
    • _matsInitiator

      protected final MatsInitiator _matsInitiator
    • _terminatorEndpointId

      protected final String _terminatorEndpointId
    • _futureCompleterThreadPool

      protected final ThreadPoolExecutor _futureCompleterThreadPool
    • _maxOutstandingPromises

      protected final int _maxOutstandingPromises
    • _replyHandlerEndpoint

      protected final MatsEndpoint<Void,String> _replyHandlerEndpoint
    • _threadNumber

      protected final AtomicInteger _threadNumber
    • _replyHandlerEndpointStarted

      protected volatile boolean _replyHandlerEndpointStarted
    • _internalStateLock

      protected final ReentrantLock _internalStateLock
    • _timeouterPing_InternalStateLock

      protected final Condition _timeouterPing_InternalStateLock
    • _correlationIdToPromiseMap

      protected final HashMap<String,MatsFuturizer.Promise<?>> _correlationIdToPromiseMap
    • _timeoutSortedPromises

      protected final PriorityQueue<MatsFuturizer.Promise<?>> _timeoutSortedPromises
    • _nextInLineToTimeout

      protected MatsFuturizer.Promise<?> _nextInLineToTimeout
    • _runFlag

      protected volatile boolean _runFlag
  • Constructor Details

    • MatsFuturizer

      protected MatsFuturizer(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises)
  • Method Details

    • createMatsFuturizer

      public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory)
      Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). The defaults for the parameters from the fully fledged factory method are identical to the createMatsFuturizer(MatsFactory, String), but with this variant also the 'endpointIdPrefix' is set to what is returned by matsFactory.getFactoryConfig().getAppName(). Note that if you - against the above suggestion - create more than one MatsFuturizer for a MatsFactory, then you MUST give them different endpointIdPrefixes, thus you cannot use this method!
      Parameters:
      matsFactory - the underlying MatsFactory on which outgoing messages will be sent, and on which the receiving SubscriptionTerminator will be created.
      Returns:
      the MatsFuturizer, which is tied to a newly created SubscriptionTerminator.
    • createMatsFuturizer

      public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, String endpointIdPrefix)
      Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). The number of threads in the future-completer-pool is what matsFactory.getFactoryConfig().getConcurrency() returns at creation time x 4 for "corePoolSize", but at least 5, (i.e. "min"); and concurrency * 20, but at least 100, for "maximumPoolSize" (i.e. max). The pool is set up to let non-core threads expire after 5 minutes. The maximum number of outstanding promises is set to 50k.
      Parameters:
      matsFactory - the underlying MatsFactory on which outgoing messages will be sent, and on which the receiving SubscriptionTerminator will be created.
      endpointIdPrefix - the first part of the endpointId, which typically should be some "class-like" construct denoting the service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for a MatsFactory, this parameter must be different for each instance!
      Returns:
      the MatsFuturizer, which is tied to a newly created SubscriptionTerminator.
    • createMatsFuturizer

      public static MatsFuturizer createMatsFuturizer(MatsFactory matsFactory, String endpointIdPrefix, int corePoolSize, int maxPoolSize, int maxOutstandingPromises)
      Creates a MatsFuturizer, and you should only need one per MatsFactory (which again mostly means one per application or micro-service or JVM). With this factory method you can specify the number of threads in the future-completer-pool with the parameters "corePoolSize" and "maxPoolSize" threads, which effectively means min and max. The pool is set up to let non-core threads expire after 5 minutes. You must also specify the max number of outstanding promises, if you want no effective limit, use Integer.MAX_VALUE.
      Parameters:
      matsFactory - the underlying MatsFactory on which outgoing messages will be sent, and on which the receiving SubscriptionTerminator will be created.
      endpointIdPrefix - the first part of the endpointId, which typically should be some "class-like" construct denoting the service name, like "OrderService" or "InventoryService", preferably the same prefix you use for all your other endpoints running on this same service. Note: If you create multiple MatsFuturizers for a MatsFactory, this parameter must be different for each instance!
      corePoolSize - the minimum number of threads in the future-completer-pool of threads.
      maxPoolSize - the maximum number of threads in the future-completer-pool of threads.
      maxOutstandingPromises - the maximum number of outstanding Promises before new are rejected. Should be a fairly high number, e.g. the default of createMatsFuturizer(MatsFactory, String) is 50k.
      Returns:
      the MatsFuturizer, which is tied to a newly created SubscriptionTerminator.
    • futurize

      public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurize(String traceId, String from, String to, int timeout, TimeUnit unit, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit)
      The generic form of initiating a request-message that returns a CompletableFuture, which enables you to tailor all properties. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any "sideloads" to the outgoing message, use the "customInit" parameter, which directly is the InitiateLambda that the MatsFuturizer initiation is using.

      For a bit more explanation, please read JavaDoc of futurizeInteractiveUnreliable(..)

      Type Parameters:
      T - the type of the reply DTO.
      Parameters:
      traceId - TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
      from - the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
      to - to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String)
      timeout - how long before the internal timeout-mechanism of MatsFuturizer kicks in and the future is completed exceptionally with a MatsFuturizer.MatsFuturizerTimeoutException.
      unit - the unit of time of the 'timeout' parameter.
      replyClass - which expected reply DTO class that the requested endpoint replies with.
      request - the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
      customInit - the MatsInitiator.InitiateLambda that the MatsFuturizer is employing to initiate the outgoing message, which you can use to tailor the message, e.g. setting the interactive-flag or tacking on "sideloads".
      Returns:
      a CompletableFuture which will be resolved with a MatsFuturizer.Reply-instance that contains both some meta-data, and the reply from the requested endpoint.
    • futurize

      public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurize(String traceId, String from, String to, Class<T> replyClass, Object request, MatsInitiator.InitiateLambda customInit)
      Convenience-variant of the generic futurize(..) form, where the timeout is set to 2.5 minutes. To set interactive-, nonPersistent- or noAudit-flags, or to tack on any "sideloads" to the outgoing message, use the "customInit" parameter, which directly is the InitiateLambda that the MatsFuturizer initiation is using.

      For a bit more explanation, please read JavaDoc of futurizeInteractiveUnreliable(..)

      Type Parameters:
      T - the type of the reply DTO.
      Parameters:
      traceId - TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
      from - the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
      to - to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String) the unit of time of the 'timeout' parameter.
      replyClass - which expected reply DTO class that the requested endpoint replies with.
      request - the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
      customInit - the MatsInitiator.InitiateLambda that the MatsFuturizer is employing to initiate the outgoing message, which you can use to tailor the message, e.g. setting the interactive-flag or tacking on "sideloads".
      Returns:
      a CompletableFuture which will be resolved with a MatsFuturizer.Reply-instance that contains both some meta-data, and the reply from the requested endpoint.
    • futurizeNonessential

      public <T> CompletableFuture<MatsFuturizer.Reply<T>> futurizeNonessential(String traceId, String from, String to, Class<T> replyClass, Object request)
      NOTICE: This variant must only be used for "GET-style" Requests where none of the endpoints the call flow passes will add, remove or alter any state of the system, and where it doesn't matter all that much if a message (and hence the Mats flow) is lost!

      The goal of this method is to be able to get hold of e.g. account holdings, order statuses etc, for presentation to a user. The thinking is that if such a flow fails where a message of the call flow disappears, this won't make for anything else than a bit annoyed user: No important state change, like the adding, deleting or change of an order, will be lost. Also, speed is of the essence. Therefore, non-persistent. At the same time, to make the user super happy in the ordinary circumstances, all messages in this call flow will be prioritized, and thus skip any queue backlogs that have arose on any of the call flow's endpoints, e.g. due to some massive batch of (background) processes executing at the same time. Therefore, interactive. Notice that with both of these features combined, you get very fast messaging, as non-persistent means that the message will not have to be stored to permanent storage at any point, while interactive means that it will skip any backlogged queues. In addition, the noAudit flag is set, since it is a waste of storage space to archive the actual contents of Request and Reply messages that do not alter the system.

      Sets the following properties on the sent Mats message:

      • Non-persistent: Since it is not vitally important that this message is not lost, non-persistent messaging can be used. The minuscule chance for this message to disappear is not worth the considerable overhead of store-and-forward multiple times to persistent storage. Also, speed is much more interesting.
      • Interactive: Since the Futurizer should only be used as a "synchronous bridge" when a human is actively waiting for the response, the interactive flag is set. (For all other users, you should rather code "proper Mats" with initiations, endpoints and terminators).
      • No audit: Since this message will not change the state of the system (i.e. the "GET-style" requests), using storage on auditing requests and replies is not worthwhile.
      This method initiates an non-persistent (unreliable), interactive (prioritized), non-audited (request and reply DTOs won't be archived) Request-message to the specified endpoint, returning a CompletableFuture that will be completed when the Reply from the requested endpoint comes back. The internal MatsFuturizer timeout will be set to 2.5 minutes, meaning that if there is no reply forthcoming within that time, the CompletableFuture will be completed exceptionally with a MatsFuturizerTimeoutException, and the Promise deleted from the futurizer. 2.5 minutes is probably too long to wait for any normal interaction with a system, so if you use the CompletableFuture.get(timeout, TimeUnit) method of the returned future, you might want to put a lower timeout there - if the answer hasn't come within that time, you'll get a TimeoutException. If you instead use the non-param variant get(), you will get an ExecutionException when the 2.5 minutes have passed (that exception's cause will be the MatsFuturizerTimeoutException mentioned above).
      Type Parameters:
      T - the type of the reply DTO.
      Parameters:
      traceId - TraceId of the resulting Mats call flow, see MatsInitiator.MatsInitiate.traceId(CharSequence)
      from - the "from" of the initiation, see MatsInitiator.MatsInitiate.from(String)
      to - to which Mats endpoint the request should go, see MatsInitiator.MatsInitiate.to(String)
      replyClass - which expected reply DTO class that the requested endpoint replies with.
      request - the request DTO that should be sent to the endpoint, see MatsInitiator.MatsInitiate.request(Object)
      Returns:
      a CompletableFuture which will be resolved with a MatsFuturizer.Reply-instance that contains both some meta-data, and the reply from the requested endpoint.
    • getOutstandingPromiseCount

      public int getOutstandingPromiseCount()
      Returns:
      the number of outstanding promises, not yet completed or timed out.
    • getCompleterThreadPool

      public ThreadPoolExecutor getCompleterThreadPool()
      Returns:
      the future-completer-thread-pool, for introspection. If you mess with it, you will be sorry..!
    • _newThreadPool

      protected ThreadPoolExecutor _newThreadPool(int corePoolSize, int maximumPoolSize)
    • _createPromise

      protected <T> MatsFuturizer.Promise<T> _createPromise(String traceId, String from, String to, Class<T> replyClass, int timeout, TimeUnit unit)
    • _enqueuePromise

      protected <T> void _enqueuePromise(MatsFuturizer.Promise<T> promise)
    • _assertFuturizerRunning

      protected void _assertFuturizerRunning()
    • _sendRequestToFulfillPromise

      protected <T> void _sendRequestToFulfillPromise(String from, String endpointId, String traceId, Object request, MatsInitiator.InitiateLambda extraMessageInit, MatsFuturizer.Promise<T> promise)
    • _handleRepliesForPromises

      protected void _handleRepliesForPromises(MatsEndpoint.ProcessContext<Void> context, String correlationId, MatsEndpoint.MatsObject matsObject)
    • _deserializeReply

      protected Object _deserializeReply(MatsEndpoint.MatsObject matsObject, Class<?> toClass)
    • _uncheckedComplete

      protected void _uncheckedComplete(MatsEndpoint.ProcessContext<Void> context, Object replyObject, MatsFuturizer.Promise<?> promise)
    • _startTimeouterThread

      protected void _startTimeouterThread()
    • _timeoutCompleteExceptionally

      protected void _timeoutCompleteExceptionally(MatsFuturizer.Promise<?> promise, String msg)
    • close

      public void close()
      Closes the MatsFuturizer. Notice: Spring will also notice this method if the MatsFuturizer is registered as a @Bean, and will register it as a destroy method.
      Specified by:
      close in interface AutoCloseable