Class JmsMatsProcessContext<R,S,Z>

java.lang.Object
io.mats3.impl.jms.JmsMatsProcessContext<R,S,Z>
All Implemented Interfaces:
JmsMatsStatics, MatsEndpoint.DetachedProcessContext, MatsEndpoint.ProcessContext<R>

public class JmsMatsProcessContext<R,S,Z> extends Object implements MatsEndpoint.ProcessContext<R>, JmsMatsStatics
The JMS Mats implementation of MatsEndpoint.ProcessContext. Instantiated for each incoming JMS message that is processed, given to the MatsStage's process lambda.
  • Method Details

    • getStageId

      public String getStageId()
      Specified by:
      getStageId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the stageId that is processed, i.e. the id of this stage. It will be equal to MatsEndpoint.DetachedProcessContext.getEndpointId() for the first stage in multi-stage-endpoint, and for the sole stage of a single-stage and terminator endpoint. Should probably never be necessary, but accessible for introspection.
    • getFromAppName

      public String getFromAppName()
      Specified by:
      getFromAppName in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the AppName of the MatsFactory from which the currently processing message came. Thus, if this message is the result of a 'next' call, it will be yourself.
    • getFromAppVersion

      public String getFromAppVersion()
      Specified by:
      getFromAppVersion in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the AppVersion of the MatsFactory from which the currently processing message came. Thus, if this message is the result of a 'next' call, it will be yourself.
    • getFromStageId

      public String getFromStageId()
      Specified by:
      getFromStageId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the stageId from which the currently processing message came. Note that the stageId of the initial stage of an endpoint is equal to the endpointId. If this endpoint is the initial target of the initiation, this value is equal to MatsEndpoint.DetachedProcessContext.getInitiatorId().
    • getFromTimestamp

      public Instant getFromTimestamp()
      Specified by:
      getFromTimestamp in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the Instant which this message was created on the sending stage.
    • getInitiatingAppName

      public String getInitiatingAppName()
      Specified by:
      getInitiatingAppName in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the AppName of the MatsFactory that initiated the Flow which the currently processing is a part of. Thus, if this endpoint is the initial target of the initiation, this value is equal to MatsEndpoint.DetachedProcessContext.getFromAppName().
    • getInitiatingAppVersion

      public String getInitiatingAppVersion()
      Specified by:
      getInitiatingAppVersion in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the AppVersion of the MatsFactory that initiated the Flow which the currently processing is a part of. Thus, if this endpoint is the initial target of the initiation, this value is equal to MatsEndpoint.DetachedProcessContext.getFromAppVersion().
    • getInitiatorId

      public String getInitiatorId()
      Specified by:
      getInitiatorId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the "initiatorId" set by the initiation with MatsInitiator.MatsInitiate.from(String).
    • getInitiatingTimestamp

      public Instant getInitiatingTimestamp()
      Specified by:
      getInitiatingTimestamp in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the Instant which this message was initiated, i.e. sent from a MatsInitiator (or within a Stage).
    • getMatsMessageId

      public String getMatsMessageId()
      Specified by:
      getMatsMessageId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the unique messageId for the incoming message from Mats - which can be used to catch double-deliveries.
    • getSystemMessageId

      public String getSystemMessageId()
      Specified by:
      getSystemMessageId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the unique messageId for the incoming message, from the underlying message system - which could be used to catch double-deliveries, but do prefer MatsEndpoint.DetachedProcessContext.getMatsMessageId(). (For a JMS Implementation, this will be the "JMSMessageID").
    • isNonPersistent

      public boolean isNonPersistent()
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      This is relevant if stashing or otherwise when a stage is accessing an external system (e.g. another MQ) which have a notion of persistence.
      Specified by:
      isNonPersistent in interface MatsEndpoint.DetachedProcessContext
      Returns:
      whether the current Mats flow is non-persistent - read MatsInitiator.MatsInitiate.nonPersistent().
    • isInteractive

      public boolean isInteractive()
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      This is relevant if stashing or otherwise when a stage is accessing an external system (e.g. another MQ) which have a notion of prioritization.
      Specified by:
      isInteractive in interface MatsEndpoint.DetachedProcessContext
      Returns:
      whether the current Mats flow is interactive (prioritized) - read MatsInitiator.MatsInitiate.interactive().
    • isNoAudit

      public boolean isNoAudit()
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      Hint to monitoring/logging/auditing systems that this call flow is not very valuable to fully audit, typically because it is just a "getter" of information for display to a user, or is health check request to see if the endpoint is up and answers in a timely manner.
      Specified by:
      isNoAudit in interface MatsEndpoint.DetachedProcessContext
      Returns:
      whether the current Mats flow is "no audit" - read MatsInitiator.MatsInitiate.noAudit().
    • getBytesKeys

      public Set<String> getBytesKeys()
      Specified by:
      getBytesKeys in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the keys on which bytes lives.
    • toString

      public String toString()
      Specified by:
      toString in interface MatsEndpoint.DetachedProcessContext
      Overrides:
      toString in class Object
      Returns:
      a for-human-consumption, multi-line debug-String representing the current processing context, typically the "MatsTrace" up to the current stage. The format is utterly arbitrary, can and will change between versions and revisions, and shall NOT be used programmatically!!
    • getTraceId

      public String getTraceId()
      Specified by:
      getTraceId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the trace id for the processed message.
      See Also:
    • getEndpointId

      public String getEndpointId()
      Specified by:
      getEndpointId in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the endpointId that is processed, i.e. the id of this endpoint. Should probably never be necessary, but accessible for introspection.
    • getBytes

      public byte[] getBytes(String key)
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      Get binary "sideloads" from the incoming message.
      Specified by:
      getBytes in interface MatsEndpoint.DetachedProcessContext
      Parameters:
      key - the key for which to retrieve a binary payload from the incoming message.
      Returns:
      the requested byte array.
      See Also:
    • getStringKeys

      public Set<String> getStringKeys()
      Specified by:
      getStringKeys in interface MatsEndpoint.DetachedProcessContext
      Returns:
      the keys on which strings lives.
    • getString

      public String getString(String key)
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      Get String "sideloads" from the incoming message.
      Specified by:
      getString in interface MatsEndpoint.DetachedProcessContext
      Parameters:
      key - the key for which to retrieve a String payload from the incoming message.
      Returns:
      the requested String.
      See Also:
    • addBytes

      public void addBytes(String key, byte[] payload)
      Description copied from interface: MatsEndpoint.ProcessContext
      Attaches a binary payload ("sideload") to the next outgoing message, being it a request or a reply. Note that for initiations, you have the same method on the MatsInitiator.MatsInitiate instance.

      The rationale for having this is to not have to encode a largish byte array inside the JSON structure that carries the Request or Reply DTO - byte arrays represent very badly in JSON.

      Note: The byte array is not compressed (as might happen with the DTO), so if the payload is large, you might want to consider compressing it before attaching it (and will then have to decompress it on the receiving side).

      Note: This will be added to the subsequent request, reply or next message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Any initiations are separate from this, neither getting nor consuming binaries, strings nor trace properties set on the ProcessContext - they must be set on the MatsInitiate instance within the initiate-lambda).

      Specified by:
      addBytes in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      key - the key on which to store the byte array payload. The receiver will have to use this key to get the payload out again, so either it will be a specific key that the sender and receiver agree upon, or you could generate a random key, and reference this key as a field in the outgoing DTO.
      payload - the payload to store.
      See Also:
    • addString

      public void addString(String key, String payload)
      Description copied from interface: MatsEndpoint.ProcessContext
      Attaches a String payload ("sideload") to the next outgoing message, being it a request or a reply. Note that for initiations, you have the same method on the MatsInitiator.MatsInitiate instance.

      The rationale for having this is to not have to encode a largish string document inside the JSON structure that carries the Request or Reply DTO.

      Note: The String payload is not compressed (as might happen with the DTO), so if the payload is large, you might want to consider compressing it before attaching it and instead use the addBytes(..) method (and will then have to decompress it on the receiving side).

      Note: This will be added to the subsequent request, reply or next message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Any initiations are separate from this, neither getting nor consuming binaries, strings nor trace properties set on the ProcessContext - they must be set on the MatsInitiate instance within the initiate-lambda).

      Specified by:
      addString in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      key - the key on which to store the String payload. The receiver will have to use this key to get the payload out again, so either it will be a specific key that the sender and receiver agree upon, or you could generate a random key, and reference this key as a field in the outgoing DTO.
      payload - the payload to store.
      See Also:
    • setTraceProperty

      public void setTraceProperty(String propertyName, Object propertyValue)
      Description copied from interface: MatsEndpoint.ProcessContext
      Adds a property that will "stick" with the Mats Trace from this call on out. Note that for initiations, you have the same method on the MatsInitiator.MatsInitiate instance. The functionality effectively acts like a ThreadLocal when compared to normal java method invocations: If the Initiator adds it, all subsequent stages will see it, on any stack level, including the terminator. If a stage in a service nested some levels down in the stack adds it, it will be present in all subsequent stages including all the way to the Terminator. Note that any initiations within a Stage will also inherit trace properties present on the Stage's incoming message.

      Possible use cases: You can for example "sneak along" some property meant for Service X through an invocation of intermediate Service A (which subsequently calls Service X), where the signature (DTO) of the intermediate Service A does not provide such functionality. Another usage would be to add some "global context variable", e.g. "current user", that is available for any down-stream Service that requires it. Both of these scenarios can obviously lead to pretty hard-to-understand code if used extensively: When employed, you should code rather defensively, where if this property is not present when a stage needs it, it should throw MatsEndpoint.MatsRefuseMessageException and clearly explain that the property needs to be present.

      Note: This will be added to the subsequent request, reply or next message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Any initiations are separate from this, neither getting nor consuming binaries, strings nor trace properties set on the ProcessContext - they must be set on the MatsInitiate instance within the initiate-lambda).

      Note: incoming trace properties (that was present on the incoming message) will be added to all outgoing message, including initiations within the stage.

      Specified by:
      setTraceProperty in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      propertyName - the name of the property
      propertyValue - the value of the property, which will be serialized using the active MATS serializer.
      See Also:
    • logMeasurement

      public void logMeasurement(String metricId, String metricDescription, String baseUnit, double measure, String... labelKeyValue)
      Description copied from interface: MatsEndpoint.ProcessContext
      Adds a measurement of a described variable, in a base unit, for this Stage - be sure to understand that the three String parameters are constants for each measurement. To exemplify, you may measure five different things in a Stage, i.e. "number of items in order", "total amount for order in dollar", etc - and each of these obviously have different metricId, metricDescription and possibly different baseUnit from each other. BUT, the specific arguments for "number of items in order" (outside the measure itself!) shall not change between one Stage processing and the next: e.g. the metricDescription shall NOT be dynamically constructed to e.g. say "Number of items in order 1234 for customer 5678".

      Note: It is illegal to use the same 'metricId' for more than one measurement for a given stage, and this also goes between measurements and timing measurements.

      Inclusion as metric by plugin 'mats-intercept-micrometer': A new meter will be created (and cached), of type DistributionSummary, with the 'name' set to "mats.exec.ops.measure.{metricId}.{baseUnit}" ("measure"->"time" for timings), and 'description' to description. (Tags/labels already added on the meter by the plugin include 'appName', 'initiatorId', 'initiatingAppName', 'stageId' (for stages), and 'initiatorName' (for inits)). Read about parameter 'labelKeyValue' below.

      Inclusion as log line by plugin 'mats-intercept-logging': A log line will be output by each added measurement, where the MDC for that log line will have an entry with key "mats.ops.measure.{metricId}.{baseUnit}". Read about parameter 'labelKeyValue' below.

      It generally makes most sense if the same metrics are added for each processing of a particular Stage, i.e. if the "number of items" are 0, then that should also be recorded along with the "total amount for order in dollar" as 0, not just elided. Otherwise, your metrics will be skewed.

      You should use a dot-notation for the metricId if you want to add multiple meters with a hierarchical/subdivision layout.

      The vararg 'labelKeyValue' is an optional element where the String-array consist of one or several alternate key, value pairs. Do not employ this feature unless you know what the effects are, and you actually need it! This will be added as labels/tags to the metric, and added to the SLF4J MDC for the measurement log line with the key being "mats.ops.measure.{metricId}.{labelKey}" ("measure"->"time" for timings). The keys should be constants as explained for the other parameters, while the value can change, but only between a given set of values (think enum) - using e.g. the 'customerId' as value doesn't make sense and will blow up your metric cardinality. Notice that if you do employ e.g. two labels, each having one of three values, you'll effectively create 9 different meters, where your measurement will go to one of them.

      NOTICE: If you want to do a timing, then instead use MatsEndpoint.ProcessContext.logTimingMeasurement(String, String, long, String...)

      Specified by:
      logMeasurement in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      metricId - constant, short, possibly dot-separated if hierarchical, id for this particular metric, e.g. "items" or "amount", or "db.query.orders".
      metricDescription - constant, textual description for this metric, e.g. "Number of items in customer order", "Total amount of customer order"
      baseUnit - the unit for this measurement, e.g. "quantity" (for a count measure), "dollar" (for an amount), or "bytes" (for a document size).
      measure - value of the measurement
      labelKeyValue - a String-vararg array consisting of alternate key,value pairs which will becomes labels or tags or entries for the metrics and log lines. Read the JavaDoc above; the keys shall be "static" for a specific measure, while the values can change between a specific small set values.
    • logTimingMeasurement

      public void logTimingMeasurement(String metricId, String metricDescription, long nanos, String... labelKeyValue)
      Description copied from interface: MatsEndpoint.ProcessContext
      Same as addMeasurement(..), but specifically for timings - Read that JavaDoc!

      Note: It is illegal to use the same 'metricId' for more than one measurement for a given stage, and this also goes between timing measurements and measurements.

      For the metrics-plugin 'mats-intercept-micrometer' plugin, the 'baseUnit' argument is deduced to whatever is appropriate for the receiving metrics system, e.g. for Prometheus it is "seconds", even though you always record the measurement in nanoseconds using this method.

      For the logging-plugin 'mats-intercept-logging' plugin, the timing in the log line will be in milliseconds (with fractions), even though you always record the measurement in nanoseconds using this method.

      Specified by:
      logTimingMeasurement in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      metricId - constant, short, possibly dot-separated if hierarchical, id for this particular metric, e.g. "db.query.orders" or "calcprofit".
      metricDescription - constant, textual description for this metric, e.g. "Time taken to execute order query", "Time taken to calculate profit or loss".
      nanos - time taken in nanoseconds
      labelKeyValue - a String-vararg array consisting of alternate key,value pairs which will becomes labels or tags or entries for the metrics and log lines. Read the JavaDoc at addMeasurement(..)
    • stash

      public byte[] stash()
      Description copied from interface: MatsEndpoint.ProcessContext
      Returns a binary representation of the current Mats flow's incoming execution point, which can be unstashed again at a later time using the MatsInitiator, thereby providing a simplistic "continuation" feature in Mats. You will have to find storage for these bytes yourself - an obvious place is the co-transactional database that the stage typically has available. This feature gives the ability to "pause" the current Mats flow, and later restore the execution from where it left off, probably with some new information that have been gathered in the meantime. This can typically relieve the Mats Stage Processing thread from having to wait for another service's execution (whose execution must then be handled by some other thread). This could be a longer running process, or a process whose execution time is variable, maybe residing on a Mats-external service structure: E.g. some REST service that sometimes lags, or sometimes is down in smaller periods. Or a service on a different Message Broker. Once this "Mats external" processing has finished, that thread can invoke unstash(stashBytes,...) to get the Mats flow going again. Notice that functionally, the unstash-operation is a kind of initiation, only that this type of initiation doesn't start a new Mats flow, rather continuing an existing flow.

      Notice that this feature should not typically be used to "park" a Mats flow for days. One might have a situation where a part of an order flow potentially needs manual handling, e.g. validating a person's identity if this has not been validated before. It might (should!) be tempting to employ the stash function then: Stash the Mats flow in a database. Make a GUI where the ID-validation can be performed by some employee. When the ID is either accepted or denied, you unstash the Mats flow with the result, getting a very nice continuous mats flow for new orders which is identical whether or not ID validation needs to be performed. However, if this ID-validation process can take days or weeks to execute, it will be a poor candidate for the stash-feature. The reason is that embedded within the execution context which you get a binary serialization of, there might be several serialized state representations of the endpoints laying upstream of this call flow. When you "freeze" these by invoking stash, you have immediately made a potential future deserialization-crash if you change the code of those upstream endpoints (which quite probably resides in different code bases than the one employing the stash feature), specifically changes of the state classes they employ: When you deploy these code changes while having multiple flows frozen in stashes, you will have a problem when they are later unstashed and the Mats flow returns to those endpoints whose state classes won't deserialize back anymore. It is worth noting that you always have these problems when doing deploys where the state classes of Mats endpoints change - it is just that usually, there won't be any, and at least not many, such flows in execution at the precise deploy moment (and also, that changing the state classes are in practice really not that frequent). However, by stashing over days, instead of a normal Mats flow that take seconds, you massively increase the time window in which such deserialization problems can occur. You at least have to consider this if employing the stash-functionality.

      Note about data and metadata which should be stored along with the stash-bytes: You only get a binary serialized incoming execution context in return from this method (which includes the incoming message, incoming state, the execution stack and trace properties, but not "sideloaded" bytes and strings). The returned byte array are utterly opaque seen from the Mats API side (however, depending on the serialization mechanism employed in the Mats implementation, you might be able to peek into them anyway - but this should at most be used for debugging/monitoring introspection). Therefore, any information from the incoming message, or from your state object, or anything else from the MatsEndpoint.DetachedProcessContext which is needed to actually execute the job that should be performed outside of the Mats flow, must be picked out manually before exiting the process lambda. This also goes for "sideloaded" objects (bytes and strings) - which will not be available inside the unstashed process lambda (they are not a part of the stash-bytes). Also, you should for debugging/monitoring purposes also store at least the Mats flow's TraceId and a timestamp along with the stash and data. You should probably also have some kind of monitoring / health checks for stashes that have become stale - i.e. stashes that have not been unstashed for a considerable time, and whose Mats flow have thus stopped up, and where the downstream endpoints/stages therefore will not get invoked.

      Notes:

      • Invoking stash() will not affect the stage processing in any way other than producing a serialized representation of the current incoming execution point. You can still send out messages. You could even reply, but then, what would be the point of stashing?
      • Repeated invocations within the same stage will yield (effectively) the same stash, as any processing done inside the stage before invoking stash() don't affect the incoming execution point.
      • You will have to exit the current process lambda yourself - meaning that this cannot be used in a lastStage, as you cannot return from such a stage without actually sending a reply (return null replies with null). Instead employ a normal stage, using MatsEndpoint.ProcessContext.reply(Object) to return a reply if needed.
      • Mats won't care if you unstash() the same stash multiple times, but your downstream parts of the Mats flow might find this a bit strange.
      Specified by:
      stash in interface MatsEndpoint.ProcessContext<R>
      Returns:
      a binary representation of the current Mats flow's incoming execution point (i.e. any incoming state and the incoming message - along with the Mats flow stack at this point). It shall start with the 4 ASCII letters "MATS", and then 4 more letters representing which mechanism is employed to construct the rest of the byte array.
    • getTraceProperty

      public <T> T getTraceProperty(String propertyName, Class<T> clazz)
      Description copied from interface: MatsEndpoint.DetachedProcessContext
      Retrieves the Mats Trace property with the specified name, deserializing the value to the specified class, using the active MATS serializer. Read more on MatsEndpoint.ProcessContext.setTraceProperty(String, Object).
      Specified by:
      getTraceProperty in interface MatsEndpoint.DetachedProcessContext
      Parameters:
      propertyName - the name of the Mats Trace property to retrieve.
      clazz - the class to which the value should be deserialized.
      Returns:
      the value of the Mats Trace property, deserialized as the specified class.
      See Also:
    • reply

      public MatsInitiator.MessageReference reply(Object replyDto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Sends a reply to the requesting service. When returning an object from a lastStage lambda, this is the method that actually gets invoked.

      This will be ignored if there is no endpointId on the stack, i.e. if this endpoint it is semantically a terminator (the replyTo of an initiation's request), or if it is the last stage of an endpoint that was invoked directly (using MatsInitiate.send(msg)).

      It is possible to do "early return" in a multi-stage endpoint by invoking this method in a stage that is not the last.

      Note: Legal outgoing flows: Either one or several request messages; OR a single reply, next, nextDirect, or goTo message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.

      Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present on the reply-receiving stage.

      Specified by:
      reply in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      replyDto - the reply DTO to return to the invoker.
    • request

      public MatsInitiator.MessageReference request(String endpointId, Object requestDto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Sends a request message, meaning that the specified endpoint will be invoked, with the reply-to endpointId set to the next stage in the multi-stage endpoint.

      This will throw if the current process stage is a terminator, single-stage endpoint or the last endpoint of a multi-stage endpoint, as there then is no next stage to reply to.

      Note: Legal outgoing flows: Either one or several request messages; OR a single reply, next, nextDirect, or goTo message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.

      Note: The current state and DTO is serialized when invoking this method. This means that in case of multiple requests, you may change the state in between each request, and the next stage will get different "incoming states" for each of the replies, which may be of use in a scatter-gather scenario.

      Specified by:
      request in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      endpointId - which endpoint to invoke
      requestDto - the message that should be sent to the specified endpoint.
    • next

      public MatsInitiator.MessageReference next(Object nextDto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Sends a message which passes the control to the next stage of a multi-stage endpoint. The functionality is meant for doing conditional requests, e.g.:
       if (something missing) {
           request another endpoint with reply coming to next stage.
       }
       else {
           pass to next stage
       }
       
      This can be of utility if an endpoint in some situations requires more information from a collaborating endpoint, while in other situations it does not require that information.

      Note: You should rather use MatsEndpoint.ProcessContext.nextDirect(Object) if your transactional demarcation needs allows it!

      Note: Legal outgoing flows: Either one or several request messages; OR a single reply, next, nextDirect, or goTo message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.

      Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present in the subsequent stage.

      Specified by:
      next in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      nextDto - the object for the next stage's incoming DTO, which must match what the next stage expects. When using this method to skip a request, it probably often makes sense to set it to null, which the next stage then must handle correctly.
      See Also:
    • nextDirect

      public void nextDirect(Object nextDirectDto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Specialized, less resource demanding, and faster "direct" variant of MatsEndpoint.ProcessContext.next(Object) which executes the next stage of a multi-stage endpoint within the same stage processor and transactional demarcation that this stage is in - that is, there is no actual message sent. This ensures that you neither incur any transactional cost nor the overhead of serialization and sending a message on the message queue with subsequent receiving and deserialization.

      The functionality is meant for doing conditional calls, e.g.:

       if (something missing) {
           request another endpoint with reply coming to next stage.
       }
       else {
           directly execute the next stage
       }
       
      This can be of utility if an endpoint in some situations requires more information from a collaborating endpoint, while in other situations it does not require that information. A scenario nextDirect is particularly good for is lazy cache population where the caching of the information only incurs cost if the information is missing, since if the information is present in the cache, you can do a close to zero cost nextDirect invocation. Had you been using ordinary next, you would incur the cost of sending and receiving a message even though the information is present in the cache.

      Implementation details which are unspecified and whose effects or non-effects shall not be relied on:

      • It is unspecified which thread runs the next stage: The invocation may be done by the same stage processor thread which executes this stage, or the execution may be done by another thread. Therefore, you cannot expect any ThreadLocals set in this lambda to to be present in the next.
      • It is unspecified how the next lambda is invoked: You cannot expect this to behave like a method call to the next lambda (i.e. the next stage has been executed when the method returns), nor can you expect the lambda to be executed only when the current lambda has exited (i.e. behaving like a message). Therefore, invocation of this method should be the very last operation in the current lambda so that this does not make a difference.

      Some notes:

      • There is no new message system message created for a nextDirect, so e.g. messageIds when in the next stage will refer to the previous stage's incoming message.
      • Since there is no message system message, the message broker won't see any trace of a nextDirect. This also means that you'll get less counts of messages on the next stage's queue.
      • The stage processing thread for the stage doing nextDirect is busy while the next stage executes, either by actually executing it, or waiting for the execution, depending on the Mats implementation. If the next stage is slow, any queue build-up will therefore happen on the stage that performed the nextDirect. Any adjustment of concurrency must be done on the stage actually receiving message system messages.
      • It is legal to do "series" of nextDirects, i.e. that one stage does a nextDirect, and then the next stage also does a nextDirect.
      • Neither the state object or the DTO is serialized and deserialized, but instead passed directly to the next stage. This ties back to the point about unspecified way of invoking the next lambda and that the invocation of nextDirect should be the last operation in the current stage: It is unspecified whether a change to the state object or DTO after the invocation of nextDirect will be visible to the next stage. (This as opposed to all the other message sending methods, where it is specified that the objects are serialized upon method invocation, and any changes done afterwards will not be visible to the receiver.)
      • If the next stage throws an exception, it will be the current stage that eventually DLQs - this might be confusing when researching an error.
      • If employing MatsTrace (which the JMS impl do), it will contain no record of the nextDirect having been performed. For example, if the message crops up on a DLQ, any nextDirects in the flow will not be visible.
      • There are implications for the Interceptors, some of which might be subtle. E.g. any preprocess and deserialization timings, and message sizes, will be 0. Also, read up on the stageCompleted(..) vs. stageCompletedNextDirect(..)

      Note: Legal outgoing flows: Either one or several request messages; OR a single reply, next, nextDirect, or goTo message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.

      Specified by:
      nextDirect in interface MatsEndpoint.ProcessContext<R>
    • goTo

      public MatsInitiator.MessageReference goTo(String endpointId, Object gotoDto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Sends a message which passes the current call stack over to another endpoint, so that when that endpoint replies, it will return to the endpoint which invoked this endpoint.

      This would be of use in dispatcher scenarios, where you might have a case-style evaluation of the incoming message, and then either handle it yourself, or send the control over to some other endpoint (possibly one of several other endpoints) - so that when they again reply, it will be to the original caller.

      A specific dispatcher-like situation where this could be of use, is if you have an endpoint that for some specific (known) entities consumes a particularly large amount of memory. For example, you might have a specific set of frequent customers which when loaded takes much more memory than a normal customer. If you get an influx of orders for these customers at the same time, your standard endpoint with a high concurrency could lead to an out-of-memory situation. A solution here could be to instantiate that same endpoint code at two different endpointIds - the public one, and a private variant with much lower concurrency. The standard, public endpoint would at the initial stage evaluate if this was one of the known memory-hogging customers, and if so, make a context.goto(..) over to the other private endpoint with lower concurrency thereby ensuring that the memory usage would be contained. (The endpoint would have to evaluate if it is the public or private instance wrt. whether it should do the eval-then-goto: Either by looking at its endpointId (check if it is the private, low-concurrency variant), or by use of the initialState-feature, or by modifying the DTO before goTo, or by sideloads).

      Another use is tail calls, whereby one endpoint A does some preprocessing, and then invokes another endpoint B, but where endpoint A really just want to directly reply with the reply from endpoint B. The extra reply stage of endpoint A is thus totally useless and just incurs additional message passing and processing. You can instead just goTo endpoint B, which achieves just this outcome: When endpoint B now replies, it will reply to the caller of endpoint A.

      Note: Legal outgoing flows: Either one or several request messages; OR a single reply, next, nextDirect, or goTo message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.

      Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present for the targeted endpoint.

      Specified by:
      goTo in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      endpointId - which endpoint to go to.
      gotoDto - the message that should be sent to the specified endpoint. Will in a dispatcher scenario often be the incoming DTO for the current endpoint, while in a tail call situation be the requestDto for the target endpoint.
    • goTo

      public MatsInitiator.MessageReference goTo(String endpointId, Object gotoDto, Object initialTargetSto)
      Description copied from interface: MatsEndpoint.ProcessContext
      Variation of MatsEndpoint.ProcessContext.goTo(String, Object) method, where the incoming state is sent along.

      This only makes sense if the same code base "owns" both the current endpoint, and the endpoint to which this message is sent.

      Specified by:
      goTo in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      endpointId - which endpoint to go to.
      gotoDto - the message that should be sent to the specified endpoint. Will in a dispatcher scenario often be the incoming DTO for the current endpoint, while in a tail call situation be the requestDto for the target endpoint.
      initialTargetSto - the object which the target endpoint will get as its initial stage STO (State Transfer Object).
    • initiate

      public void initiate(MatsInitiator.InitiateLambda lambda)
      Description copied from interface: MatsEndpoint.ProcessContext
      Initiates a new message out to an endpoint. This is effectively the same as invoking the same method on a MatsInitiator gotten via MatsFactory.getOrCreateInitiator(String), only that this way works within the transactional context of the MatsStage which this method is invoked within. Also, the traceId and from-endpointId is predefined, but it is still recommended to set the traceId, as that will append the new string on the existing traceId, making log tracking (e.g. when debugging) better.

      IMPORTANT NOTICE!! The MatsInitiator returned from MatsFactory.getDefaultInitiator() is "magic" in that when employed from within a Mats Stage's context (thread), it works exactly as this method: Any initiations performed participates in the Mats Stage's transactional demarcation. Read more at the JavaDoc of default initiator's JavaDoc..

      Specified by:
      initiate in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      lambda - provides the MatsInitiator.MatsInitiate instance on which to create the message to be sent.
    • doAfterCommit

      public void doAfterCommit(Runnable runnable)
      Description copied from interface: MatsEndpoint.ProcessContext
      The Runnable will be performed after messaging and external resources (DB) have been committed. An example can be if the Mats-lambda inserts a row in a database that should be processed by some other component (i.e. a service running with some Threads), and thus wants to wake up that component telling it that new work is available. Problem is then that if this "wakeUp()" call is done within the lambda, the row is not technically there yet - as we're still within the SQL transaction demarcation. Therefore, if the process-service wakes up really fast and tries to find the new work, it will not see anything yet. (It might then presume that e.g. another node of the service-cluster took care of whatever woke it up, and go back to sleep.)

      Note: This is per processing; Setting it is only relevant for the current message. If you invoke the method more than once, only the last Runnable will be run. If you set it to null, you "cancel" any previously set Runnable.

      Note: If any Exception is raised from the stage lambda code after the Runnable has been set, or any Exception is raised by the processing or committing, the Runnable will not be run.

      Note: If the doAfterCommit Runnable throws a RuntimeException, it will be logged on ERROR level, then ignored.

      Specified by:
      doAfterCommit in interface MatsEndpoint.ProcessContext<R>
      Parameters:
      runnable - the code to run right after the transaction of both external resources and messaging has been committed. Setting to null "cancels" any previously set Runnable.
    • getAttribute

      public <T> Optional<T> getAttribute(Class<T> type, String... name)
      Description copied from interface: MatsEndpoint.ProcessContext
      Provides a way to get hold of (optional) attributes/objects from the Mats implementation, either specific to the Mats implementation in use, or configured into this instance of the Mats implementation. Is mirrored by the same method at MatsInitiator.MatsInitiate.getAttribute(Class, String...). There is also a ThreadLocal-accessible version at MatsFactory.ContextLocal.getAttribute(Class, String...).

      Mandatory: If the Mats implementation has a transactional SQL Connection, it shall be available by 'context.getAttribute(Connection.class)'.

      Specified by:
      getAttribute in interface MatsEndpoint.ProcessContext<R>
      Type Parameters:
      T - The type of the attribute.
      Parameters:
      type - The expected type of the attribute
      name - The (optional) (hierarchical) name(s) of the attribute.
      Returns:
      Optional of the attribute in question, the optionality pointing out that it depends on the Mats implementation or configuration whether it is available.
      See Also: