Package io.mats3.impl.jms
Class JmsMatsProcessContext<R,S,Z>
java.lang.Object
io.mats3.impl.jms.JmsMatsProcessContext<R,S,Z>
- All Implemented Interfaces:
JmsMatsStatics
,MatsEndpoint.DetachedProcessContext
,MatsEndpoint.ProcessContext<R>
public class JmsMatsProcessContext<R,S,Z>
extends Object
implements MatsEndpoint.ProcessContext<R>, JmsMatsStatics
The JMS Mats implementation of
MatsEndpoint.ProcessContext
. Instantiated for each incoming JMS message that is processed,
given to the MatsStage
's process lambda.-
Field Summary
Fields inherited from interface io.mats3.impl.jms.JmsMatsStatics
EXTRA_GRACE_MILLIS, ILLEGAL_CALL_FLOWS, JMS_MSG_PROP_AUDIT, JMS_MSG_PROP_DISPATCH_TYPE, JMS_MSG_PROP_FROM, JMS_MSG_PROP_INITIALIZING_APP, JMS_MSG_PROP_INITIATOR_ID, JMS_MSG_PROP_MATS_MESSAGE_ID, JMS_MSG_PROP_MESSAGE_TYPE, JMS_MSG_PROP_TO, JMS_MSG_PROP_TRACE_ID, LOG_PREFIX, MAX_STACK_HEIGHT, MAX_TOTAL_CALL_NUMBER, MDC_MATS_APP_NAME, MDC_MATS_APP_VERSION, MDC_MATS_CALL_NUMBER, MDC_MATS_IN_MESSAGE_SYSTEM_ID, MDC_MATS_INIT, MDC_MATS_OUT_MATS_MESSAGE_ID, MDC_MATS_STAGE, MDC_MATS_STAGE_ID, MDC_MATS_STAGE_INDEX, MDC_TRACE_ID, NO_INVOCATION_POINT, RANDOM_ALPHABET, THREAD_PREFIX, TOTAL_JMS_MSG_PROPS_SIZE
-
Method Summary
Modifier and TypeMethodDescriptionvoid
Attaches a binary payload ("sideload") to the next outgoing message, being it a request or a reply.void
Attaches aString
payload ("sideload") to the next outgoing message, being it a request or a reply.void
doAfterCommit
(Runnable runnable) The Runnable will be performed after messaging and external resources (DB) have been committed.<T> Optional<T>
getAttribute
(Class<T> type, String... name) Provides a way to get hold of (optional) attributes/objects from the Mats implementation, either specific to the Mats implementation in use, or configured into this instance of the Mats implementation.byte[]
Get binary "sideloads" from the incoming message.GetString
"sideloads" from the incoming message.<T> T
getTraceProperty
(String propertyName, Class<T> clazz) Retrieves the Mats Trace property with the specified name, deserializing the value to the specified class, using the active MATS serializer.Sends a message which passes the current call stack over to another endpoint, so that when that endpoint replies, it will return to the endpoint which invoked this endpoint.Variation ofMatsEndpoint.ProcessContext.goTo(String, Object)
method, where the incoming state is sent along.void
initiate
(MatsInitiator.InitiateLambda lambda) Initiates a new message out to an endpoint.boolean
This is relevant if stashing or otherwise when a stage is accessing an external system (e.g.boolean
Hint to monitoring/logging/auditing systems that this call flow is not very valuable to fully audit, typically because it is just a "getter" of information for display to a user, or is health check request to see if the endpoint is up and answers in a timely manner.boolean
This is relevant if stashing or otherwise when a stage is accessing an external system (e.g.void
logMeasurement
(String metricId, String metricDescription, String baseUnit, double measure, String... labelKeyValue) Adds a measurement of a described variable, in a base unit, for this Stage - be sure to understand that the three String parameters are constants for each measurement. To exemplify, you may measure five different things in a Stage, i.e.void
logTimingMeasurement
(String metricId, String metricDescription, long nanos, String... labelKeyValue) Same asaddMeasurement(..)
, but specifically for timings - Read that JavaDoc!Sends a message which passes the control to the next stage of a multi-stage endpoint.void
nextDirect
(Object nextDirectDto) Specialized, less resource demanding, and faster "direct" variant ofMatsEndpoint.ProcessContext.next(Object)
which executes the next stage of a multi-stage endpoint within the same stage processor and transactional demarcation that this stage is in - that is, there is no actual message sent.Sends a reply to the requesting service.Sends a request message, meaning that the specified endpoint will be invoked, with the reply-to endpointId set to the next stage in the multi-stage endpoint.void
setTraceProperty
(String propertyName, Object propertyValue) Adds a property that will "stick" with the Mats Trace from this call on out.byte[]
stash()
Returns a binary representation of the current Mats flow's incoming execution point, which can beunstashed
again at a later time using theMatsInitiator
, thereby providing a simplistic "continuation" feature in Mats.toString()
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface io.mats3.impl.jms.JmsMatsStatics
createFlowId, getInvocationPoint, id, id, idThis, ms3, produceAndSendMsgSysMessages, randomString, setConcurrencyWithLog, stageOrInit
Methods inherited from interface io.mats3.MatsEndpoint.ProcessContext
unwrapFully
-
Method Details
-
getStageId
- Specified by:
getStageId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the stageId that is processed, i.e. the id of this stage. It will be equal to
MatsEndpoint.DetachedProcessContext.getEndpointId()
for the first stage in multi-stage-endpoint, and for the sole stage of a single-stage and terminator endpoint. Should probably never be necessary, but accessible for introspection.
-
getFromAppName
- Specified by:
getFromAppName
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
AppName
of the MatsFactory from which the currently processing message came. Thus, if this message is the result of a 'next' call, it will be yourself.
-
getFromAppVersion
- Specified by:
getFromAppVersion
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
AppVersion
of the MatsFactory from which the currently processing message came. Thus, if this message is the result of a 'next' call, it will be yourself.
-
getFromStageId
- Specified by:
getFromStageId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the stageId from which the currently processing message came. Note that the stageId of the initial
stage of an endpoint is equal to the endpointId. If this endpoint is the initial target of the
initiation, this value is equal to
MatsEndpoint.DetachedProcessContext.getInitiatorId()
.
-
getFromTimestamp
- Specified by:
getFromTimestamp
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
Instant
which this message was created on the sending stage.
-
getInitiatingAppName
- Specified by:
getInitiatingAppName
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
AppName
of the MatsFactory that initiated the Flow which the currently processing is a part of. Thus, if this endpoint is the initial target of the initiation, this value is equal toMatsEndpoint.DetachedProcessContext.getFromAppName()
.
-
getInitiatingAppVersion
- Specified by:
getInitiatingAppVersion
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
AppVersion
of the MatsFactory that initiated the Flow which the currently processing is a part of. Thus, if this endpoint is the initial target of the initiation, this value is equal toMatsEndpoint.DetachedProcessContext.getFromAppVersion()
.
-
getInitiatorId
- Specified by:
getInitiatorId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the "initiatorId" set by the initiation with
MatsInitiator.MatsInitiate.from(String)
.
-
getInitiatingTimestamp
- Specified by:
getInitiatingTimestamp
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
Instant
which this message was initiated, i.e. sent from a MatsInitiator (or within a Stage).
-
getMatsMessageId
- Specified by:
getMatsMessageId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the unique messageId for the incoming message from Mats - which can be used to catch double-deliveries.
-
getSystemMessageId
- Specified by:
getSystemMessageId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the unique messageId for the incoming message, from the underlying message system - which could be
used to catch double-deliveries, but do prefer
MatsEndpoint.DetachedProcessContext.getMatsMessageId()
. (For a JMS Implementation, this will be the "JMSMessageID").
-
isNonPersistent
public boolean isNonPersistent()Description copied from interface:MatsEndpoint.DetachedProcessContext
This is relevant if stashing or otherwise when a stage is accessing an external system (e.g. another MQ) which have a notion of persistence.- Specified by:
isNonPersistent
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- whether the current Mats flow is non-persistent - read
MatsInitiator.MatsInitiate.nonPersistent()
.
-
isInteractive
public boolean isInteractive()Description copied from interface:MatsEndpoint.DetachedProcessContext
This is relevant if stashing or otherwise when a stage is accessing an external system (e.g. another MQ) which have a notion of prioritization.- Specified by:
isInteractive
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- whether the current Mats flow is interactive (prioritized) - read
MatsInitiator.MatsInitiate.interactive()
.
-
isNoAudit
public boolean isNoAudit()Description copied from interface:MatsEndpoint.DetachedProcessContext
Hint to monitoring/logging/auditing systems that this call flow is not very valuable to fully audit, typically because it is just a "getter" of information for display to a user, or is health check request to see if the endpoint is up and answers in a timely manner.- Specified by:
isNoAudit
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- whether the current Mats flow is "no audit" - read
MatsInitiator.MatsInitiate.noAudit()
.
-
getBytesKeys
- Specified by:
getBytesKeys
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the keys on which
bytes
lives.
-
toString
- Specified by:
toString
in interfaceMatsEndpoint.DetachedProcessContext
- Overrides:
toString
in classObject
- Returns:
- a for-human-consumption, multi-line debug-String representing the current processing context, typically the "MatsTrace" up to the current stage. The format is utterly arbitrary, can and will change between versions and revisions, and shall NOT be used programmatically!!
-
getTraceId
- Specified by:
getTraceId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the
trace id
for the processed message. - See Also:
-
getEndpointId
- Specified by:
getEndpointId
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the endpointId that is processed, i.e. the id of this endpoint. Should probably never be necessary, but accessible for introspection.
-
getBytes
Description copied from interface:MatsEndpoint.DetachedProcessContext
Get binary "sideloads" from the incoming message.- Specified by:
getBytes
in interfaceMatsEndpoint.DetachedProcessContext
- Parameters:
key
- the key for which to retrieve a binary payload from the incoming message.- Returns:
- the requested byte array.
- See Also:
-
getStringKeys
- Specified by:
getStringKeys
in interfaceMatsEndpoint.DetachedProcessContext
- Returns:
- the keys on which
strings
lives.
-
getString
Description copied from interface:MatsEndpoint.DetachedProcessContext
GetString
"sideloads" from the incoming message.- Specified by:
getString
in interfaceMatsEndpoint.DetachedProcessContext
- Parameters:
key
- the key for which to retrieve a String payload from the incoming message.- Returns:
- the requested String.
- See Also:
-
addBytes
Description copied from interface:MatsEndpoint.ProcessContext
Attaches a binary payload ("sideload") to the next outgoing message, being it a request or a reply. Note that for initiations, you have the same method on theMatsInitiator.MatsInitiate
instance. The rationale for having this is to not have to encode a largish byte array inside the JSON structure that carries the Request or Reply DTO - byte arrays represent very badly in JSON. Note: The byte array is not compressed (as might happen with the DTO), so if the payload is large, you might want to consider compressing it before attaching it (and will then have to decompress it on the receiving side). Note: This will be added to the subsequentrequest
,reply
ornext
message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Anyinitiations
are separate from this, neither getting nor consuming binaries, strings nor trace properties set on theProcessContext
- they must be set on theMatsInitiate
instance within the initiate-lambda).- Specified by:
addBytes
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
key
- the key on which to store the byte array payload. The receiver will have to use this key to get the payload out again, so either it will be a specific key that the sender and receiver agree upon, or you could generate a random key, and reference this key as a field in the outgoing DTO.payload
- the payload to store.- See Also:
-
addString
Description copied from interface:MatsEndpoint.ProcessContext
Attaches aString
payload ("sideload") to the next outgoing message, being it a request or a reply. Note that for initiations, you have the same method on theMatsInitiator.MatsInitiate
instance. The rationale for having this is to not have to encode a largish string document inside the JSON structure that carries the Request or Reply DTO. Note: The String payload is not compressed (as might happen with the DTO), so if the payload is large, you might want to consider compressing it before attaching it and instead use theaddBytes(..)
method (and will then have to decompress it on the receiving side). Note: This will be added to the subsequentrequest
,reply
ornext
message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Anyinitiations
are separate from this, neither getting nor consuming binaries, strings nor trace properties set on theProcessContext
- they must be set on theMatsInitiate
instance within the initiate-lambda).- Specified by:
addString
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
key
- the key on which to store the String payload. The receiver will have to use this key to get the payload out again, so either it will be a specific key that the sender and receiver agree upon, or you could generate a random key, and reference this key as a field in the outgoing DTO.payload
- the payload to store.- See Also:
-
setTraceProperty
Description copied from interface:MatsEndpoint.ProcessContext
Adds a property that will "stick" with the Mats Trace from this call on out. Note that for initiations, you have the same method on theMatsInitiator.MatsInitiate
instance. The functionality effectively acts like aThreadLocal
when compared to normal java method invocations: If the Initiator adds it, all subsequent stages will see it, on any stack level, including the terminator. If a stage in a service nested some levels down in the stack adds it, it will be present in all subsequent stages including all the way to the Terminator. Note that any initiations within a Stage will also inherit trace properties present on the Stage's incoming message. Possible use cases: You can for example "sneak along" some property meant for Service X through an invocation of intermediate Service A (which subsequently calls Service X), where the signature (DTO) of the intermediate Service A does not provide such functionality. Another usage would be to add some "global context variable", e.g. "current user", that is available for any down-stream Service that requires it. Both of these scenarios can obviously lead to pretty hard-to-understand code if used extensively: When employed, you should code rather defensively, where if this property is not present when a stage needs it, it should throwMatsEndpoint.MatsRefuseMessageException
and clearly explain that the property needs to be present. Note: This will be added to the subsequentrequest
,reply
ornext
message - and then cleared. Thus, if you perform multiple request or next calls, then each must have their binaries, strings and trace properties set separately. (Anyinitiations
are separate from this, neither getting nor consuming binaries, strings nor trace properties set on theProcessContext
- they must be set on theMatsInitiate
instance within the initiate-lambda). Note: incoming trace properties (that was present on the incoming message) will be added to all outgoing message, including initiations within the stage.- Specified by:
setTraceProperty
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
propertyName
- the name of the propertypropertyValue
- the value of the property, which will be serialized using the active MATS serializer.- See Also:
-
logMeasurement
public void logMeasurement(String metricId, String metricDescription, String baseUnit, double measure, String... labelKeyValue) Description copied from interface:MatsEndpoint.ProcessContext
Adds a measurement of a described variable, in a base unit, for this Stage - be sure to understand that the three String parameters are constants for each measurement. To exemplify, you may measure five different things in a Stage, i.e. "number of items in order", "total amount for order in dollar", etc - and each of these obviously have different metricId, metricDescription and possibly different baseUnit from each other. BUT, the specific arguments for "number of items in order" (outside the measure itself!) shall not change between one Stage processing and the next: e.g. the metricDescription shall NOT be dynamically constructed to e.g. say "Number of items in order 1234 for customer 5678". Note: It is illegal to use the same 'metricId' for more than one measurement for a given stage, and this also goes between measurements andtiming measurements
. Inclusion as metric by plugin 'mats-intercept-micrometer': A new meter will be created (and cached), of typeDistributionSummary
, with the 'name' set to"mats.exec.ops.measure.{metricId}.{baseUnit}"
("measure"->"time" for timings), and 'description' to description. (Tags/labels already added on the meter by the plugin include 'appName', 'initiatorId', 'initiatingAppName', 'stageId' (for stages), and 'initiatorName' (for inits)). Read about parameter 'labelKeyValue' below. Inclusion as log line by plugin 'mats-intercept-logging': A log line will be output by each added measurement, where the MDC for that log line will have an entry with key"mats.ops.measure.{metricId}.{baseUnit}"
. Read about parameter 'labelKeyValue' below. It generally makes most sense if the same metrics are added for each processing of a particular Stage, i.e. if the "number of items" are 0, then that should also be recorded along with the "total amount for order in dollar" as 0, not just elided. Otherwise, your metrics will be skewed. You should use a dot-notation for the metricId if you want to add multiple meters with a hierarchical/subdivision layout. The vararg 'labelKeyValue' is an optional element where the String-array consist of one or several alternate key, value pairs. Do not employ this feature unless you know what the effects are, and you actually need it! This will be added as labels/tags to the metric, and added to the SLF4J MDC for the measurement log line with the key being"mats.ops.measure.{metricId}.{labelKey}"
("measure"->"time" for timings). The keys should be constants as explained for the other parameters, while the value can change, but only between a given set of values (thinkenum
) - using e.g. the 'customerId' as value doesn't make sense and will blow up your metric cardinality. Notice that if you do employ e.g. two labels, each having one of three values, you'll effectively create 9 different meters, where your measurement will go to one of them. NOTICE: If you want to do a timing, then instead useMatsEndpoint.ProcessContext.logTimingMeasurement(String, String, long, String...)
- Specified by:
logMeasurement
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
metricId
- constant, short, possibly dot-separated if hierarchical, id for this particular metric, e.g. "items" or "amount", or "db.query.orders".metricDescription
- constant, textual description for this metric, e.g. "Number of items in customer order", "Total amount of customer order"baseUnit
- the unit for this measurement, e.g. "quantity" (for a count measure), "dollar" (for an amount), or "bytes" (for a document size).measure
- value of the measurementlabelKeyValue
- a String-vararg array consisting of alternate key,value pairs which will becomes labels or tags or entries for the metrics and log lines. Read the JavaDoc above; the keys shall be "static" for a specific measure, while the values can change between a specific small set values.
-
logTimingMeasurement
public void logTimingMeasurement(String metricId, String metricDescription, long nanos, String... labelKeyValue) Description copied from interface:MatsEndpoint.ProcessContext
Same asaddMeasurement(..)
, but specifically for timings - Read that JavaDoc! Note: It is illegal to use the same 'metricId' for more than one measurement for a given stage, and this also goes between timing measurements andmeasurements
. For the metrics-plugin 'mats-intercept-micrometer' plugin, the 'baseUnit' argument is deduced to whatever is appropriate for the receiving metrics system, e.g. for Prometheus it is "seconds", even though you always record the measurement in nanoseconds using this method. For the logging-plugin 'mats-intercept-logging' plugin, the timing in the log line will be in milliseconds (with fractions), even though you always record the measurement in nanoseconds using this method.- Specified by:
logTimingMeasurement
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
metricId
- constant, short, possibly dot-separated if hierarchical, id for this particular metric, e.g. "db.query.orders" or "calcprofit".metricDescription
- constant, textual description for this metric, e.g. "Time taken to execute order query", "Time taken to calculate profit or loss".nanos
- time taken in nanosecondslabelKeyValue
- a String-vararg array consisting of alternate key,value pairs which will becomes labels or tags or entries for the metrics and log lines. Read the JavaDoc ataddMeasurement(..)
-
stash
public byte[] stash()Description copied from interface:MatsEndpoint.ProcessContext
Returns a binary representation of the current Mats flow's incoming execution point, which can beunstashed
again at a later time using theMatsInitiator
, thereby providing a simplistic "continuation" feature in Mats. You will have to find storage for these bytes yourself - an obvious place is the co-transactional database that the stage typically has available. This feature gives the ability to "pause" the current Mats flow, and later restore the execution from where it left off, probably with some new information that have been gathered in the meantime. This can typically relieve the Mats Stage Processing thread from having to wait for another service's execution (whose execution must then be handled by some other thread). This could be a longer running process, or a process whose execution time is variable, maybe residing on a Mats-external service structure: E.g. some REST service that sometimes lags, or sometimes is down in smaller periods. Or a service on a different Message Broker. Once this "Mats external" processing has finished, that thread can invokeunstash(stashBytes,...)
to get the Mats flow going again. Notice that functionally, the unstash-operation is a kind of initiation, only that this type of initiation doesn't start a new Mats flow, rather continuing an existing flow. Notice that this feature should not typically be used to "park" a Mats flow for days. One might have a situation where a part of an order flow potentially needs manual handling, e.g. validating a person's identity if this has not been validated before. It might (should!) be tempting to employ the stash function then: Stash the Mats flow in a database. Make a GUI where the ID-validation can be performed by some employee. When the ID is either accepted or denied, you unstash the Mats flow with the result, getting a very nice continuous mats flow for new orders which is identical whether or not ID validation needs to be performed. However, if this ID-validation process can take days or weeks to execute, it will be a poor candidate for the stash-feature. The reason is that embedded within the execution context which you get a binary serialization of, there might be several serialized state representations of the endpoints laying upstream of this call flow. When you "freeze" these by invoking stash, you have immediately made a potential future deserialization-crash if you change the code of those upstream endpoints (which quite probably resides in different code bases than the one employing the stash feature), specifically changes of the state classes they employ: When you deploy these code changes while having multiple flows frozen in stashes, you will have a problem when they are later unstashed and the Mats flow returns to those endpoints whose state classes won't deserialize back anymore. It is worth noting that you always have these problems when doing deploys where the state classes of Mats endpoints change - it is just that usually, there won't be any, and at least not many, such flows in execution at the precise deploy moment (and also, that changing the state classes are in practice really not that frequent). However, by stashing over days, instead of a normal Mats flow that take seconds, you massively increase the time window in which such deserialization problems can occur. You at least have to consider this if employing the stash-functionality. Note about data and metadata which should be stored along with the stash-bytes: You only get a binary serialized incoming execution context in return from this method (which includes the incoming message, incoming state, the execution stack andtrace properties
, but not "sideloaded"bytes
andstrings
). The returned byte array are utterly opaque seen from the Mats API side (however, depending on the serialization mechanism employed in the Mats implementation, you might be able to peek into them anyway - but this should at most be used for debugging/monitoring introspection). Therefore, any information from the incoming message, or from your state object, or anything else from theMatsEndpoint.DetachedProcessContext
which is needed to actually execute the job that should be performed outside of the Mats flow, must be picked out manually before exiting the process lambda. This also goes for "sideloaded" objects (bytes
andstrings
) - which will not be available inside the unstashed process lambda (they are not a part of the stash-bytes). Also, you should for debugging/monitoring purposes also store at least the Mats flow'sTraceId
and a timestamp along with the stash and data. You should probably also have some kind of monitoring / health checks for stashes that have become stale - i.e. stashes that have not been unstashed for a considerable time, and whose Mats flow have thus stopped up, and where the downstream endpoints/stages therefore will not get invoked. Notes:- Invoking
stash()
will not affect the stage processing in any way other than producing a serialized representation of the current incoming execution point. You can still send out messages. You could even reply, but then, what would be the point of stashing? - Repeated invocations within the same stage will yield (effectively) the same stash, as any processing
done inside the stage before invoking
stash()
don't affect the incoming execution point. - You will have to exit the current process lambda yourself - meaning that this cannot be used in a
lastStage
, as you cannot return from such a stage without actually sending a reply (return null
replies withnull
). Instead employ anormal stage
, usingMatsEndpoint.ProcessContext.reply(Object)
to return a reply if needed. - Mats won't care if you unstash() the same stash multiple times, but your downstream parts of the Mats flow might find this a bit strange.
- Specified by:
stash
in interfaceMatsEndpoint.ProcessContext<R>
- Returns:
- a binary representation of the current Mats flow's incoming execution point (i.e. any incoming state and the incoming message - along with the Mats flow stack at this point). It shall start with the 4 ASCII letters "MATS", and then 4 more letters representing which mechanism is employed to construct the rest of the byte array.
- Invoking
-
getTraceProperty
Description copied from interface:MatsEndpoint.DetachedProcessContext
Retrieves the Mats Trace property with the specified name, deserializing the value to the specified class, using the active MATS serializer. Read more onMatsEndpoint.ProcessContext.setTraceProperty(String, Object)
.- Specified by:
getTraceProperty
in interfaceMatsEndpoint.DetachedProcessContext
- Parameters:
propertyName
- the name of the Mats Trace property to retrieve.clazz
- the class to which the value should be deserialized.- Returns:
- the value of the Mats Trace property, deserialized as the specified class.
- See Also:
-
reply
Description copied from interface:MatsEndpoint.ProcessContext
Sends a reply to the requesting service. When returning an object from alastStage
lambda, this is the method that actually gets invoked. This will be ignored if there is no endpointId on the stack, i.e. if this endpoint it is semantically a terminator (thereplyTo
of an initiation's request), or if it is the last stage of an endpoint that was invoked directly (usingMatsInitiate.send(msg)
). It is possible to do "early return" in a multi-stage endpoint by invoking this method in a stage that is not the last. Note: Legal outgoing flows: Either one or severalrequest
messages; OR a singlereply
,next
,nextDirect
, orgoTo
message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply. Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present on the reply-receiving stage.- Specified by:
reply
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
replyDto
- the reply DTO to return to the invoker.
-
request
Description copied from interface:MatsEndpoint.ProcessContext
Sends a request message, meaning that the specified endpoint will be invoked, with the reply-to endpointId set to the next stage in the multi-stage endpoint. This will throw if the current process stage is a terminator, single-stage endpoint or the last endpoint of a multi-stage endpoint, as there then is no next stage to reply to. Note: Legal outgoing flows: Either one or severalrequest
messages; OR a singlereply
,next
,nextDirect
, orgoTo
message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply. Note: The current state and DTO is serialized when invoking this method. This means that in case of multiple requests, you may change the state in between each request, and the next stage will get different "incoming states" for each of the replies, which may be of use in a scatter-gather scenario.- Specified by:
request
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
endpointId
- which endpoint to invokerequestDto
- the message that should be sent to the specified endpoint.
-
next
Description copied from interface:MatsEndpoint.ProcessContext
Sends a message which passes the control to the next stage of a multi-stage endpoint. The functionality is meant for doing conditional requests, e.g.:if (something missing) { request another endpoint with reply coming to next stage. } else { pass to next stage }
This can be of utility if an endpoint in some situations requires more information from a collaborating endpoint, while in other situations it does not require that information. Note: You should rather useMatsEndpoint.ProcessContext.nextDirect(Object)
if your transactional demarcation needs allows it! Note: Legal outgoing flows: Either one or severalrequest
messages; OR a singlereply
,next
,nextDirect
, orgoTo
message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply. Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present in the subsequent stage.- Specified by:
next
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
nextDto
- the object for the next stage's incoming DTO, which must match what the next stage expects. When using this method to skip a request, it probably often makes sense to set it tonull
, which the next stage then must handle correctly.- See Also:
-
nextDirect
Description copied from interface:MatsEndpoint.ProcessContext
Specialized, less resource demanding, and faster "direct" variant ofMatsEndpoint.ProcessContext.next(Object)
which executes the next stage of a multi-stage endpoint within the same stage processor and transactional demarcation that this stage is in - that is, there is no actual message sent. This ensures that you neither incur any transactional cost nor the overhead of serialization and sending a message on the message queue with subsequent receiving and deserialization. The functionality is meant for doing conditional calls, e.g.:if (something missing) { request another endpoint with reply coming to next stage. } else { directly execute the next stage }
This can be of utility if an endpoint in some situations requires more information from a collaborating endpoint, while in other situations it does not require that information. A scenario nextDirect is particularly good for is lazy cache population where the caching of the information only incurs cost if the information is missing, since if the information is present in the cache, you can do a close to zero cost nextDirect invocation. Had you been usingordinary next
, you would incur the cost of sending and receiving a message even though the information is present in the cache. Implementation details which are unspecified and whose effects or non-effects shall not be relied on:- It is unspecified which thread runs the next stage: The invocation may be done by the same stage processor thread which executes this stage, or the execution may be done by another thread. Therefore, you cannot expect any ThreadLocals set in this lambda to to be present in the next.
- It is unspecified how the next lambda is invoked: You cannot expect this to behave like a method call to the next lambda (i.e. the next stage has been executed when the method returns), nor can you expect the lambda to be executed only when the current lambda has exited (i.e. behaving like a message). Therefore, invocation of this method should be the very last operation in the current lambda so that this does not make a difference.
- There is no new message system message created for a nextDirect, so e.g. messageIds when in the next stage will refer to the previous stage's incoming message.
- Since there is no message system message, the message broker won't see any trace of a nextDirect. This also means that you'll get less counts of messages on the next stage's queue.
- The stage processing thread for the stage doing nextDirect is busy while the next stage executes, either by actually executing it, or waiting for the execution, depending on the Mats implementation. If the next stage is slow, any queue build-up will therefore happen on the stage that performed the nextDirect. Any adjustment of concurrency must be done on the stage actually receiving message system messages.
- It is legal to do "series" of nextDirects, i.e. that one stage does a nextDirect, and then the next stage also does a nextDirect.
- Neither the state object or the DTO is serialized and deserialized, but instead passed directly to the next stage. This ties back to the point about unspecified way of invoking the next lambda and that the invocation of nextDirect should be the last operation in the current stage: It is unspecified whether a change to the state object or DTO after the invocation of nextDirect will be visible to the next stage. (This as opposed to all the other message sending methods, where it is specified that the objects are serialized upon method invocation, and any changes done afterwards will not be visible to the receiver.)
- If the next stage throws an exception, it will be the current stage that eventually DLQs - this might be confusing when researching an error.
- If employing MatsTrace (which the JMS impl do), it will contain no record of the nextDirect having been performed. For example, if the message crops up on a DLQ, any nextDirects in the flow will not be visible.
- There are implications for the Interceptors, some of which might be subtle. E.g. any preprocess and
deserialization timings, and message sizes, will be 0. Also, read up on the
stageCompleted(..)
vs.stageCompletedNextDirect(..)
request
messages; OR a singlereply
,next
,nextDirect
, orgoTo
message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply.- Specified by:
nextDirect
in interfaceMatsEndpoint.ProcessContext<R>
-
goTo
Description copied from interface:MatsEndpoint.ProcessContext
Sends a message which passes the current call stack over to another endpoint, so that when that endpoint replies, it will return to the endpoint which invoked this endpoint. This would be of use in dispatcher scenarios, where you might have a case-style evaluation of the incoming message, and then either handle it yourself, or send the control over to some other endpoint (possibly one of several other endpoints) - so that when they again reply, it will be to the original caller. A specific dispatcher-like situation where this could be of use, is if you have an endpoint that for some specific (known) entities consumes a particularly large amount of memory. For example, you might have a specific set of frequent customers which when loaded takes much more memory than a normal customer. If you get an influx of orders for these customers at the same time, your standard endpoint with a high concurrency could lead to an out-of-memory situation. A solution here could be to instantiate that same endpoint code at two different endpointIds - the public one, and a private variant with much lower concurrency. The standard, public endpoint would at the initial stage evaluate if this was one of the known memory-hogging customers, and if so, make a context.goto(..) over to the other private endpoint with lower concurrency thereby ensuring that the memory usage would be contained. (The endpoint would have to evaluate if it is the public or private instance wrt. whether it should do the eval-then-goto: Either by looking at its endpointId (check if it is the private, low-concurrency variant), or by use of theinitialState
-feature, or by modifying the DTO before goTo, or bysideloads
). Another use is tail calls, whereby one endpoint A does some preprocessing, and then invokes another endpoint B, but where endpoint A really just want to directly reply with the reply from endpoint B. The extra reply stage of endpoint A is thus totally useless and just incurs additional message passing and processing. You can instead just goTo endpoint B, which achieves just this outcome: When endpoint B now replies, it will reply to the caller of endpoint A. Note: Legal outgoing flows: Either one or severalrequest
messages; OR a singlereply
,next
,nextDirect
, orgoTo
message. The reason that multiple requests are allowed is that this could be used in a scatter-gather scenario - where the replies come in to the next stage of the same endpoint. However, multiple replies to the invoking endpoint makes very little sense, which is why only one reply is allowed, and it cannot be combined with request or next, nor goto, as then the next stage could also perform a reply. Note: The current state and DTO is serialized when invoking this method. Any changes to the state object or the DTO performed afterwards won't be present for the targeted endpoint.- Specified by:
goTo
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
endpointId
- which endpoint to go to.gotoDto
- the message that should be sent to the specified endpoint. Will in a dispatcher scenario often be the incoming DTO for the current endpoint, while in a tail call situation be the requestDto for the target endpoint.
-
goTo
public MatsInitiator.MessageReference goTo(String endpointId, Object gotoDto, Object initialTargetSto) Description copied from interface:MatsEndpoint.ProcessContext
Variation ofMatsEndpoint.ProcessContext.goTo(String, Object)
method, where the incoming state is sent along. This only makes sense if the same code base "owns" both the current endpoint, and the endpoint to which this message is sent.- Specified by:
goTo
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
endpointId
- which endpoint to go to.gotoDto
- the message that should be sent to the specified endpoint. Will in a dispatcher scenario often be the incoming DTO for the current endpoint, while in a tail call situation be the requestDto for the target endpoint.initialTargetSto
- the object which the target endpoint will get as its initial stage STO (State Transfer Object).
-
initiate
Description copied from interface:MatsEndpoint.ProcessContext
Initiates a new message out to an endpoint. This is effectively the same as invokingthe same method
on aMatsInitiator
gotten viaMatsFactory.getOrCreateInitiator(String)
, only that this way works within the transactional context of theMatsStage
which this method is invoked within. Also, the traceId and from-endpointId is predefined, but it is still recommended to set the traceId, as that will append the new string on the existing traceId, making log tracking (e.g. when debugging) better. IMPORTANT NOTICE!! TheMatsInitiator
returned fromMatsFactory.getDefaultInitiator()
is "magic" in that when employed from within a Mats Stage's context (thread), it works exactly as this method: Any initiations performed participates in the Mats Stage's transactional demarcation. Read more at the JavaDoc of default initiator'sJavaDoc
..- Specified by:
initiate
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
lambda
- provides theMatsInitiator.MatsInitiate
instance on which to create the message to be sent.
-
doAfterCommit
Description copied from interface:MatsEndpoint.ProcessContext
The Runnable will be performed after messaging and external resources (DB) have been committed. An example can be if the Mats-lambda inserts a row in a database that should be processed by some other component (i.e. a service running with some Threads), and thus wants to wake up that component telling it that new work is available. Problem is then that if this "wakeUp()" call is done within the lambda, the row is not technically there yet - as we're still within the SQL transaction demarcation. Therefore, if the process-service wakes up really fast and tries to find the new work, it will not see anything yet. (It might then presume that e.g. another node of the service-cluster took care of whatever woke it up, and go back to sleep.) Note: This is per processing; Setting it is only relevant for the current message. If you invoke the method more than once, only the last Runnable will be run. If you set it tonull
, you "cancel" any previously set Runnable. Note: If any Exception is raised from the stage lambda code after the Runnable has been set, or any Exception is raised by the processing or committing, the Runnable will not be run. Note: If thedoAfterCommit
Runnable throws aRuntimeException
, it will be logged on ERROR level, then ignored.- Specified by:
doAfterCommit
in interfaceMatsEndpoint.ProcessContext<R>
- Parameters:
runnable
- the code to run right after the transaction of both external resources and messaging has been committed. Setting tonull
"cancels" any previously set Runnable.
-
getAttribute
Description copied from interface:MatsEndpoint.ProcessContext
Provides a way to get hold of (optional) attributes/objects from the Mats implementation, either specific to the Mats implementation in use, or configured into this instance of the Mats implementation. Is mirrored by the same method atMatsInitiator.MatsInitiate.getAttribute(Class, String...)
. There is also a ThreadLocal-accessible version atMatsFactory.ContextLocal.getAttribute(Class, String...)
. Mandatory: If the Mats implementation has a transactional SQL Connection, it shall be available by'context.getAttribute(Connection.class)'
.- Specified by:
getAttribute
in interfaceMatsEndpoint.ProcessContext<R>
- Type Parameters:
T
- The type of the attribute.- Parameters:
type
- The expected type of the attributename
- The (optional) (hierarchical) name(s) of the attribute.- Returns:
- Optional of the attribute in question, the optionality pointing out that it depends on the Mats implementation or configuration whether it is available.
- See Also:
-