diff --git a/lib/mobility-core/mobility-core.cabal b/lib/mobility-core/mobility-core.cabal
index bae8825ef..6efc09db4 100644
--- a/lib/mobility-core/mobility-core.cabal
+++ b/lib/mobility-core/mobility-core.cabal
@@ -132,6 +132,7 @@ library
Kernel.External.Maps.OSRM.RoadsClient
Kernel.External.Maps.Types
Kernel.External.Maps.Utils
+ Kernel.External.MasterCloudForward
Kernel.External.MultiModal
Kernel.External.MultiModal.Interface
Kernel.External.MultiModal.Interface.Google
@@ -155,6 +156,12 @@ library
Kernel.External.Notification.PayTM.Client
Kernel.External.Notification.PayTM.Types
Kernel.External.Notification.Types
+ Kernel.External.PartnerSdk.Aarokya.Flow
+ Kernel.External.PartnerSdk.Aarokya.Types
+ Kernel.External.PartnerSdk.Interface
+ Kernel.External.PartnerSdk.Interface.Aarokya
+ Kernel.External.PartnerSdk.Interface.Types
+ Kernel.External.PartnerSdk.Types
Kernel.External.Payment.Interface
Kernel.External.Payment.Interface.Events.Types
Kernel.External.Payment.Interface.Juspay
@@ -731,6 +738,163 @@ library
, xmlbf
default-language: Haskell2010
+executable master-cloud-forward-itest
+ main-is: Main.hs
+ other-modules:
+ Paths_mobility_core
+ hs-source-dirs:
+ test-integration
+ default-extensions:
+ ConstraintKinds
+ DataKinds
+ DefaultSignatures
+ DeriveAnyClass
+ DeriveFunctor
+ DeriveGeneric
+ DuplicateRecordFields
+ ExplicitNamespaces
+ FlexibleContexts
+ FlexibleInstances
+ FunctionalDependencies
+ GADTs
+ LambdaCase
+ MultiParamTypeClasses
+ MultiWayIf
+ NamedFieldPuns
+ NoImplicitPrelude
+ OverloadedLabels
+ OverloadedStrings
+ PatternSynonyms
+ PolyKinds
+ RankNTypes
+ RecordWildCards
+ ScopedTypeVariables
+ TupleSections
+ TypeApplications
+ TypeFamilies
+ TypeOperators
+ ViewPatterns
+ BlockArguments
+ TypeSynonymInstances
+ UndecidableInstances
+ ghc-options: -fwrite-ide-info -hiedir=.hie -Wall -Wcompat -Widentities -fhide-source-paths -Werror -fplugin=RecordDotPreprocessor -Wwarn=ambiguous-fields
+ build-depends:
+ aeson
+ , aeson-casing
+ , async
+ , base >=4.7 && <5
+ , base64
+ , base64-bytestring
+ , beam-core
+ , beam-mysql
+ , beam-postgres
+ , bimap
+ , bytestring
+ , case-insensitive
+ , casing
+ , cassava
+ , cereal
+ , clickhouse-haskell
+ , clock
+ , concurrency
+ , containers
+ , cryptonite
+ , data-default-class
+ , deriving-aeson
+ , dhall
+ , directory
+ , double-conversion
+ , either
+ , esqueleto
+ , euler-hs
+ , exceptions
+ , extra
+ , fast-logger
+ , filepath
+ , fmt
+ , formatting
+ , generic-lens
+ , geojson
+ , hashable
+ , hedis
+ , hex-text
+ , hspec
+ , http-api-data
+ , http-client
+ , http-client-tls
+ , http-media
+ , http-types
+ , hw-kafka-client
+ , insert-ordered-containers
+ , jwt
+ , kleene
+ , lattices
+ , lens
+ , memory
+ , mobility-core
+ , monad-logger
+ , morpheus-graphql-client
+ , mtl
+ , network
+ , openapi3
+ , parsec
+ , passetto-client
+ , persistent
+ , persistent-postgresql
+ , postgresql-migration
+ , postgresql-simple
+ , process
+ , prometheus-client
+ , prometheus-metrics-ghc
+ , prometheus-proc
+ , random
+ , random-strings
+ , record-dot-preprocessor
+ , record-hasfield
+ , regex-compat
+ , resource-pool
+ , safe
+ , safe-exceptions
+ , safe-money
+ , scientific
+ , sequelize
+ , servant
+ , servant-client
+ , servant-client-core
+ , servant-multipart
+ , servant-multipart-api
+ , servant-multipart-client
+ , servant-openapi3
+ , servant-server
+ , singletons-th
+ , slack-web
+ , split
+ , stm
+ , string-conversions
+ , tasty
+ , tasty-hunit
+ , template-haskell
+ , text
+ , text-conversions
+ , time
+ , tinylog
+ , transformers
+ , universum
+ , unix
+ , unliftio
+ , unliftio-core
+ , unordered-containers
+ , uuid
+ , vector
+ , wai
+ , wai-app-static
+ , wai-middleware-prometheus
+ , warp
+ , xml-conduit
+ , xml-types
+ , xmlbf
+ default-language: Haskell2010
+
test-suite mobility-core-tests
type: exitcode-stdio-1.0
main-is: Main.hs
diff --git a/lib/mobility-core/package.yaml b/lib/mobility-core/package.yaml
index 49930bd41..7a15cd566 100644
--- a/lib/mobility-core/package.yaml
+++ b/lib/mobility-core/package.yaml
@@ -187,3 +187,13 @@ tests:
- test/src
dependencies:
- mobility-core
+
+executables:
+ master-cloud-forward-itest:
+ main: Main.hs
+ source-dirs:
+ - test-integration
+ dependencies:
+ - mobility-core
+ - async
+ - http-client-tls
diff --git a/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs b/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs
index f57fc4035..347837bca 100644
--- a/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs
+++ b/lib/mobility-core/src/Kernel/Beam/Connection/EnvVars.hs
@@ -70,3 +70,8 @@ getRunInMasterLTSRedisCell :: IO Bool
getRunInMasterLTSRedisCell = do
envVal <- lookupEnv "RUN_IN_MASTER_LTS_REDIS_CELL"
pure (fromMaybe False (readMaybe =<< envVal))
+
+getClusterMGetAsyncEnabled :: IO Bool
+getClusterMGetAsyncEnabled = do
+ envVal <- lookupEnv "CLUSTER_MGET_ASYNC_ENABLED"
+ pure (fromMaybe False (readMaybe =<< envVal))
diff --git a/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs
new file mode 100644
index 000000000..c63972f06
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/MasterCloudForward.hs
@@ -0,0 +1,312 @@
+{-
+ Copyright 2022-23, Juspay India Pvt Ltd
+
+ This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License
+
+ as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is
+
+ distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero
+
+ General Public License along with this program. If not, see .
+-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+-- | Generic AWS-egress forwarder. Raw HTTP passthrough — no JSON envelope
+-- on the wire. GCP-side @runThroughMasterCloud@ rewrites the request URL
+-- and adds two headers; AWS-side @forwardEgressApp@ reads those headers
+-- and replays the request from the whitelisted IP.
+module Kernel.External.MasterCloudForward
+ ( MasterCloudProxyConfig (..),
+ emptyMasterCloudProxyConfig,
+ HasMasterCloudForwarder (..),
+ ForwardError (..),
+ ForwardAPI,
+ forwardAPI,
+ runThroughMasterCloud,
+ getRunApiInMasterCloud,
+ forwardEgressApp,
+ )
+where
+
+import qualified Control.Exception as Exc
+import qualified Data.Aeson as A
+import qualified Data.ByteString.Builder as BSB
+import qualified Data.ByteString.Lazy as LBS
+import qualified Data.CaseInsensitive as CI
+import qualified Data.Sequence as Seq
+import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.Encoding.Error as TEE
+import qualified EulerHS.Language as L
+import EulerHS.Prelude (Free (..))
+import qualified EulerHS.Types as ET
+import Kernel.Prelude
+import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics
+import Kernel.Types.Common
+import Kernel.Types.Error.BaseError
+import Kernel.Types.Error.BaseError.HTTPError
+import Kernel.Utils.Dhall (FromDhall)
+import qualified Kernel.Utils.IOLogging as IOLog
+import Kernel.Utils.Logging
+import Kernel.Utils.Servant.BaseUrl (showBaseUrlText)
+import Kernel.Utils.Servant.Client (HasRequestId, defaultHttpManager)
+import qualified Network.HTTP.Client as Http
+import qualified Network.HTTP.Types as HTTP
+import qualified Network.HTTP.Types.URI as URI
+import qualified Network.Wai as Wai
+import Servant
+import Servant.Client.Core (ClientError)
+import qualified Servant.Client.Core as SCC
+import qualified Servant.Client.Free as SCF
+import qualified Servant.Server as SS
+import System.Environment (lookupEnv)
+
+-- Same record on both sides. GCP populates @masterUrl@ + @masterSecret@;
+-- AWS populates @masterSecret@ only. Unused fields stay 'Nothing'.
+data MasterCloudProxyConfig = MasterCloudProxyConfig
+ { masterUrl :: Maybe BaseUrl,
+ masterSecret :: Maybe Text
+ }
+ deriving (Generic, Show, FromJSON, ToJSON, FromDhall)
+
+emptyMasterCloudProxyConfig :: MasterCloudProxyConfig
+emptyMasterCloudProxyConfig = MasterCloudProxyConfig Nothing Nothing
+
+class HasMasterCloudForwarder env where
+ masterCloudProxyConfig :: env -> MasterCloudProxyConfig
+
+data ForwardError
+ = ForwardNotConfigured
+ | ForwardMissingSecret
+ | ForwardBadSecret
+ | ForwardMissingDestination
+ | ForwardInvalidUrl Text
+ | ForwardUpstreamFailure Text
+ deriving (Eq, Generic, Show, IsBecknAPIError, FromJSON, ToJSON)
+
+instanceExceptionWithParent 'HTTPException ''ForwardError
+
+instance IsBaseError ForwardError where
+ toMessage = \case
+ ForwardNotConfigured -> Just "Forwarder secret not configured on this service."
+ ForwardMissingSecret -> Just "Missing X-Forwarder-Secret header."
+ ForwardBadSecret -> Just "X-Forwarder-Secret does not match."
+ ForwardMissingDestination -> Just "Missing X-Forward-Destination header."
+ ForwardInvalidUrl u -> Just $ "Invalid X-Forward-Destination URL: " <> u
+ ForwardUpstreamFailure msg -> Just $ "Upstream forwarder failure: " <> msg
+
+instance IsHTTPError ForwardError where
+ toErrorCode = \case
+ ForwardNotConfigured -> "FORWARDER_NOT_CONFIGURED"
+ ForwardMissingSecret -> "FORWARDER_MISSING_SECRET"
+ ForwardBadSecret -> "FORWARDER_BAD_SECRET"
+ ForwardMissingDestination -> "FORWARDER_MISSING_DESTINATION"
+ ForwardInvalidUrl _ -> "FORWARDER_INVALID_URL"
+ ForwardUpstreamFailure _ -> "FORWARDER_UPSTREAM_FAILURE"
+
+ toHttpCode = \case
+ ForwardNotConfigured -> E401
+ ForwardMissingSecret -> E401
+ ForwardBadSecret -> E401
+ ForwardMissingDestination -> E400
+ ForwardInvalidUrl _ -> E400
+ ForwardUpstreamFailure _ -> E500
+
+instance IsAPIError ForwardError
+
+-- The @X-Forwarder-Secret@ and @X-Forward-Destination@ headers are
+-- intentionally not declared here: validation happens inside the WAI
+-- 'forwardEgressApp', and declaring them on the Servant API caused them
+-- to be parsed twice per request.
+type ForwardAPI =
+ "forward-egress"
+ :> Raw
+
+forwardAPI :: Proxy ForwardAPI
+forwardAPI = Proxy
+
+-- True iff @RUN_API_IN_MASTER_CLOUD=True@ in env.
+getRunApiInMasterCloud :: IO Bool
+getRunApiInMasterCloud = do
+ envVal <- lookupEnv "RUN_API_IN_MASTER_CLOUD"
+ pure (fromMaybe False (readMaybe =<< envVal))
+
+-- Drop-in replacement for @callAPI@. Triple-gated: env on + masterUrl set +
+-- masterSecret set → forwarded. Anything else → direct call.
+runThroughMasterCloud ::
+ ( HasMasterCloudForwarder r,
+ MonadReader r m,
+ MonadFlow m,
+ Log m,
+ Metrics.CoreMetrics m,
+ HasRequestId r
+ ) =>
+ BaseUrl ->
+ ET.EulerClient a ->
+ Text ->
+ m (Either ClientError a)
+runThroughMasterCloud origBaseUrl eClient desc = do
+ shouldForward <- liftIO getRunApiInMasterCloud
+ cfg <- asks masterCloudProxyConfig
+ case (shouldForward, cfg.masterUrl, cfg.masterSecret) of
+ (True, Just fwdUrl, Just secret) -> do
+ logDebug $ "MASTER_CLOUD_FORWARD: forwarding " <> desc <> " via " <> showBaseUrlText fwdUrl
+ interpretWithForwarder origBaseUrl fwdUrl secret eClient
+ _ -> do
+ logDebug $ "MASTER_CLOUD_FORWARD: direct call for " <> desc
+ L.callAPI' (Just defaultHttpManager) origBaseUrl eClient
+
+-- Walk the EulerClient free monad. Each RunRequest gets its path rewritten
+-- to /forward-egress and two headers appended; the rewritten request is
+-- dispatched against the AWS forwarder and the upstream response is fed
+-- back to the continuation.
+interpretWithForwarder ::
+ ( MonadFlow m,
+ Log m,
+ Metrics.CoreMetrics m,
+ MonadReader r m,
+ HasRequestId r
+ ) =>
+ BaseUrl ->
+ BaseUrl ->
+ Text ->
+ ET.EulerClient a ->
+ m (Either ClientError a)
+interpretWithForwarder origBaseUrl fwdUrl secret (ET.EulerClient freeClient) =
+ go freeClient
+ where
+ go (Pure a) = pure (Right a)
+ go (Free (SCF.Throw e)) = pure (Left e)
+ go (Free (SCF.RunRequest req cont)) = do
+ let pathBs = LBS.toStrict (BSB.toLazyByteString (SCC.requestPath req))
+ queryBs = URI.renderQuery True (toList (SCC.requestQueryString req))
+ basePart = TE.encodeUtf8 (showBaseUrlText origBaseUrl)
+ originalFullUrl = TE.decodeUtf8With TEE.lenientDecode (basePart <> pathBs <> queryBs)
+ let extraHeaders =
+ SCC.requestHeaders req
+ Seq.|> (CI.mk "X-Forwarder-Secret", TE.encodeUtf8 secret)
+ Seq.|> (CI.mk "X-Forward-Destination", TE.encodeUtf8 originalFullUrl)
+ r1 = req {SCC.requestPath = BSB.byteString "/forward-egress"}
+ r2 = r1 {SCC.requestQueryString = mempty}
+ newReq = r2 {SCC.requestHeaders = extraHeaders}
+ let onestep = ET.EulerClient (Free (SCF.RunRequest newReq pure))
+ result <- L.callAPI' (Just defaultHttpManager) fwdUrl onestep
+ case result of
+ Left err -> pure (Left err)
+ Right response -> go (cont response)
+
+-- AWS-side WAI handler. Validates secret, then replays the incoming request
+-- to @X-Forward-Destination@ and proxies the response back. The 'LoggerEnv'
+-- is the same one held on @AppEnv.loggerEnv@; log lines land in the standard
+-- Kibana index alongside @logInfo@/@logError@ output from the rest of the app.
+forwardEgressApp ::
+ IOLog.LoggerEnv ->
+ MasterCloudProxyConfig ->
+ Http.Manager ->
+ Wai.Application
+forwardEgressApp logEnv cfg mgr req sendResp =
+ case validateForwardRequest cfg req of
+ Left err -> do
+ IOLog.logOutputIO logEnv ERROR ("forward-egress validation failed: " <> T.pack (show err)) Nothing Nothing
+ sendResp (forwardErrorResponse err)
+ Right destReq -> proxyRequest logEnv destReq req sendResp mgr
+
+-- Verify @X-Forwarder-Secret@ matches and parse @X-Forward-Destination@.
+-- Returns the prepared upstream request, or a typed 'ForwardError'.
+validateForwardRequest ::
+ MasterCloudProxyConfig ->
+ Wai.Request ->
+ Either ForwardError Http.Request
+validateForwardRequest cfg req = do
+ let hdrs = Wai.requestHeaders req
+ look n = TE.decodeUtf8With TEE.lenientDecode <$> lookup n hdrs
+ expected <- cfg.masterSecret `orFail` ForwardNotConfigured
+ got <- look "X-Forwarder-Secret" `orFail` ForwardMissingSecret
+ when (got /= expected) $ Left ForwardBadSecret
+ dest <- look "X-Forward-Destination" `orFail` ForwardMissingDestination
+ Http.parseRequest (T.unpack dest) `orFail` ForwardInvalidUrl dest
+
+orFail :: Maybe a -> e -> Either e a
+orFail (Just a) _ = Right a
+orFail Nothing e = Left e
+
+-- Render a typed 'ForwardError' as a JSON @{ errorCode, errorMessage }@
+-- response with the status code from 'toHttpCode'. Same envelope as the rest
+-- of the Kernel error machinery.
+forwardErrorResponse :: ForwardError -> Wai.Response
+forwardErrorResponse err =
+ Wai.responseLBS
+ (httpCodeToStatus (toHttpCode err))
+ [(CI.mk "Content-Type", "application/json")]
+ ( A.encode $
+ A.object
+ [ "errorCode" A..= toErrorCode err,
+ "errorMessage" A..= fromMaybe "" (toMessage err)
+ ]
+ )
+
+-- 'HttpCode' lives in the Kernel error stack; @http-types@ uses 'HTTP.Status'.
+-- 'toServerError' bridges them via Servant's @ServerError@ which carries an
+-- 'Int' code.
+httpCodeToStatus :: HttpCode -> HTTP.Status
+httpCodeToStatus c =
+ let se = toServerError c
+ in HTTP.mkStatus (SS.errHTTPCode se) (TE.encodeUtf8 (T.pack (SS.errReasonPhrase se)))
+
+proxyRequest ::
+ IOLog.LoggerEnv ->
+ Http.Request ->
+ Wai.Request ->
+ (Wai.Response -> IO Wai.ResponseReceived) ->
+ Http.Manager ->
+ IO Wai.ResponseReceived
+proxyRequest logEnv destReq0 incoming sendResp mgr = do
+ bodyBytes <- Wai.strictRequestBody incoming
+ let destReq = buildDestRequest destReq0 incoming bodyBytes
+ result <- Exc.try @Exc.SomeException (Http.httpLbs destReq mgr)
+ case result of
+ Left e -> do
+ let decode = TE.decodeUtf8With TEE.lenientDecode
+ destUrl =
+ fromMaybe (decode (Http.host destReq <> Http.path destReq <> Http.queryString destReq)) $
+ decode <$> lookup "X-Forward-Destination" (Wai.requestHeaders incoming)
+ IOLog.logOutputIO
+ logEnv
+ ERROR
+ ("forward-egress upstream FAILED " <> destUrl <> " err=" <> T.pack (show e))
+ Nothing
+ Nothing
+ Right _ -> pure ()
+ sendResp (toWaiResponse result)
+
+-- Copy method/body/headers from the incoming WAI request onto the parsed
+-- destination http-client request, dropping hop-by-hop headers and the two
+-- forwarder-internal headers.
+buildDestRequest :: Http.Request -> Wai.Request -> LBS.ByteString -> Http.Request
+buildDestRequest base incoming body =
+ let h = filter notHopByHop (Wai.requestHeaders incoming)
+ r1 = base {Http.method = Wai.requestMethod incoming}
+ r2 = r1 {Http.requestBody = Http.RequestBodyLBS body}
+ in r2 {Http.requestHeaders = h}
+ where
+ notHopByHop (n, _) =
+ n /= "Host"
+ && n /= "Content-Length"
+ && n /= "X-Forwarder-Secret"
+ && n /= "X-Forward-Destination"
+
+-- Translate an http-client outcome into a WAI response. http-client.httpLbs
+-- auto-decompresses gzip, so we strip Content-Encoding / Content-Length /
+-- Transfer-Encoding from the upstream headers to keep the body consistent.
+toWaiResponse :: Either Exc.SomeException (Http.Response LBS.ByteString) -> Wai.Response
+toWaiResponse = \case
+ Left e ->
+ forwardErrorResponse (ForwardUpstreamFailure (T.pack (show e)))
+ Right resp ->
+ let safeHeaders = filter (not . isStrippable . fst) (Http.responseHeaders resp)
+ in Wai.responseLBS (Http.responseStatus resp) safeHeaders (Http.responseBody resp)
+ where
+ isStrippable n = n == "Content-Encoding" || n == "Content-Length" || n == "Transfer-Encoding"
diff --git a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs
index 85c4c93d5..65ae55aa9 100644
--- a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs
+++ b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs
@@ -44,8 +44,8 @@ notifyPerson cfg notificationData = Hedis.runInMasterCloudRedisCell $ do
[startUuid, midOneUuid, _, _] -> T.intercalate "-" [startUuid, midOneUuid]
_ -> notificationData.streamId
let object = NotificationMessage notificationStreamId now
- _ <- Hedis.withCrossAppRedis $ Hedis.publish "active-notification" object
void $ Hedis.withCrossAppRedis $ Hedis.xAddExp ("N" <> notificationStreamId <> "{" <> (show shardId) <> "}") "*" (buildFieldValue notificationData now) cfg.streamExpirationTime
+ void $ Hedis.withCrossAppRedis $ Hedis.publish "active-notification" object
where
buildFieldValue notifData createdAt =
[ ("entity.id", TE.encodeUtf8 notifData.entityId),
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs
new file mode 100644
index 000000000..afc93e492
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Flow.hs
@@ -0,0 +1,32 @@
+module Kernel.External.PartnerSdk.Aarokya.Flow where
+
+import EulerHS.Types as Euler
+import Kernel.External.PartnerSdk.Aarokya.Types
+import Kernel.Prelude
+import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics
+import Kernel.Types.Error
+import Kernel.Utils.Common
+import Servant hiding (throwError)
+
+type GenerateTokenAPI =
+ "auth"
+ :> "token"
+ :> Header "Authorization" Text
+ :> ReqBody '[JSON] AarokyaTokenRequest
+ :> Post '[JSON] AarokyaTokenResponse
+
+generateToken ::
+ (Metrics.CoreMetrics m, MonadFlow m, HasRequestId r, MonadReader r m) =>
+ BaseUrl ->
+ Text ->
+ AarokyaTokenRequest ->
+ m AarokyaTokenResponse
+generateToken url basicToken request = do
+ let proxy = Proxy @GenerateTokenAPI
+ eulerClient = Euler.client proxy (Just ("Basic " <> basicToken)) request
+ callAarokyaAPI url eulerClient "aarokya-generate-token" proxy
+
+callAarokyaAPI :: (MonadFlow m, HasRequestId r, MonadReader r m) => CallAPI' m r api res res
+callAarokyaAPI url eulerClient description proxy = do
+ callAPI url eulerClient description proxy
+ >>= fromEitherM (\err -> InternalError $ "Failed to call " <> description <> " API: " <> show err)
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs
new file mode 100644
index 000000000..e62387cb4
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Aarokya/Types.hs
@@ -0,0 +1,28 @@
+module Kernel.External.PartnerSdk.Aarokya.Types where
+
+import Kernel.External.Encryption
+import Kernel.Prelude
+
+data AarokyaIdProof = AarokyaIdProof
+ { proof_type :: Text,
+ number :: Text
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+data AarokyaTokenRequest = AarokyaTokenRequest
+ { phone_country_code :: Text,
+ phone_number :: Text,
+ id_proof :: AarokyaIdProof
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+newtype AarokyaTokenResponse = AarokyaTokenResponse
+ { access_token :: Text
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+data AarokyaSdkConfig = AarokyaSdkConfig
+ { url :: BaseUrl,
+ basicToken :: EncryptedField 'AsEncrypted Text
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs
new file mode 100644
index 000000000..ee97b04d7
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface.hs
@@ -0,0 +1,22 @@
+module Kernel.External.PartnerSdk.Interface
+ ( module Reexport,
+ module Kernel.External.PartnerSdk.Interface,
+ )
+where
+
+import qualified Kernel.External.PartnerSdk.Interface.Aarokya as Aarokya
+import Kernel.External.PartnerSdk.Interface.Types
+import Kernel.External.PartnerSdk.Types as Reexport
+import Kernel.Tools.Metrics.CoreMetrics (CoreMetrics)
+import Kernel.Utils.Common
+
+generateToken ::
+ ( EncFlow m r,
+ CoreMetrics m,
+ HasRequestId r
+ ) =>
+ PartnerSdkConfig ->
+ GenerateTokenReq ->
+ m GenerateTokenResp
+generateToken serviceConfig req = case serviceConfig of
+ AarokyaPartnerSdkConfig cfg -> Aarokya.generateToken cfg req
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs
new file mode 100644
index 000000000..fa01f5743
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Aarokya.hs
@@ -0,0 +1,45 @@
+module Kernel.External.PartnerSdk.Interface.Aarokya where
+
+import Kernel.External.Encryption
+import qualified Kernel.External.PartnerSdk.Aarokya.Flow as Aarokya
+import qualified Kernel.External.PartnerSdk.Aarokya.Types as AarokyaTypes
+import Kernel.External.PartnerSdk.Interface.Types
+import Kernel.Prelude
+import qualified Kernel.Tools.Metrics.CoreMetrics as Metrics
+import Kernel.Utils.Common
+
+generateToken ::
+ ( Metrics.CoreMetrics m,
+ EncFlow m r,
+ HasRequestId r,
+ MonadReader r m
+ ) =>
+ AarokyaTypes.AarokyaSdkConfig ->
+ GenerateTokenReq ->
+ m GenerateTokenResp
+generateToken config req = do
+ basicToken <- decrypt config.basicToken
+ let aarokyaReq = toAarokyaTokenRequest req
+ resp <- Aarokya.generateToken config.url basicToken aarokyaReq
+ pure $ fromAarokyaTokenResponse resp
+
+toAarokyaTokenRequest :: GenerateTokenReq -> AarokyaTypes.AarokyaTokenRequest
+toAarokyaTokenRequest req =
+ AarokyaTypes.AarokyaTokenRequest
+ { phone_country_code = req.phoneCountryCode,
+ phone_number = req.phoneNumber,
+ id_proof = toAarokyaIdProof req.idProof
+ }
+
+toAarokyaIdProof :: IdProof -> AarokyaTypes.AarokyaIdProof
+toAarokyaIdProof p =
+ AarokyaTypes.AarokyaIdProof
+ { proof_type = p.proofType,
+ number = p.number
+ }
+
+fromAarokyaTokenResponse :: AarokyaTypes.AarokyaTokenResponse -> GenerateTokenResp
+fromAarokyaTokenResponse resp =
+ GenerateTokenResp
+ { accessToken = resp.access_token
+ }
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs
new file mode 100644
index 000000000..0c6d21b68
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Interface/Types.hs
@@ -0,0 +1,26 @@
+module Kernel.External.PartnerSdk.Interface.Types where
+
+import qualified Kernel.External.PartnerSdk.Aarokya.Types as Aarokya
+import Kernel.Prelude
+
+data IdProof = IdProof
+ { proofType :: Text,
+ number :: Text
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+data GenerateTokenReq = GenerateTokenReq
+ { phoneCountryCode :: Text,
+ phoneNumber :: Text,
+ idProof :: IdProof
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+newtype GenerateTokenResp = GenerateTokenResp
+ { accessToken :: Text
+ }
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
+
+data PartnerSdkConfig
+ = AarokyaPartnerSdkConfig Aarokya.AarokyaSdkConfig
+ deriving (Show, Eq, Generic, ToJSON, FromJSON)
diff --git a/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs b/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs
new file mode 100644
index 000000000..93435c1a6
--- /dev/null
+++ b/lib/mobility-core/src/Kernel/External/PartnerSdk/Types.hs
@@ -0,0 +1,22 @@
+{-# LANGUAGE TemplateHaskell #-}
+
+module Kernel.External.PartnerSdk.Types where
+
+import Data.Aeson.Types
+import EulerHS.Prelude
+import Kernel.Beam.Lib.UtilsTH (mkBeamInstancesForEnumAndList)
+import Kernel.Storage.Esqueleto (derivePersistField)
+
+data PartnerSdkService = Aarokya
+ deriving (Show, Read, Eq, Ord, Generic)
+
+$(mkBeamInstancesForEnumAndList ''PartnerSdkService)
+derivePersistField "PartnerSdkService"
+
+instance FromJSON PartnerSdkService where
+ parseJSON (String "Aarokya") = pure Aarokya
+ parseJSON (String _) = parseFail "Expected \"Aarokya\""
+ parseJSON e = typeMismatch "String" e
+
+instance ToJSON PartnerSdkService where
+ toJSON = String . show
diff --git a/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs b/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs
index c1fa4ed0d..e2334bd28 100644
--- a/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs
+++ b/lib/mobility-core/src/Kernel/External/Payment/Interface/Juspay.hs
@@ -777,15 +777,15 @@ offerList config mRoutingId req = do
mkOfferListReq :: OfferListReq -> Juspay.OfferListReq
mkOfferListReq OfferListReq {..} =
Juspay.OfferListReq
- { order = mkOfferOrder order planId registrationDate dutyDate paymentMode numOfRides offerListingMetric,
+ { order = mkOfferOrder order planId registrationDate dutyDate paymentMode numOfRides offerListingMetric membershipStatus,
payment_method_info = [],
customer = mkOfferCustomer <$> customer,
offer_code = Nothing
}
-mkOfferOrder :: OfferOrder -> Text -> UTCTime -> UTCTime -> Text -> Int -> Maybe UDF6 -> Juspay.OfferOrder
+mkOfferOrder :: OfferOrder -> Text -> UTCTime -> UTCTime -> Text -> Int -> Maybe UDF6 -> Maybe UDF9 -> Juspay.OfferOrder
---- add duty day and payment mode respectively in holes ----
-mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfRides offerListingMetric =
+mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfRides offerListingMetric membershipStatus =
Juspay.OfferOrder
{ order_id = orderId,
amount = show amount,
@@ -798,6 +798,7 @@ mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfR
let strNumRides = show numOfRides
if strNumRides == "-1" then "DEFAULT" else strNumRides,
udf6 = parseUDF6 <$> offerListingMetric,
+ udf9 = parseUDF9 <$> membershipStatus,
basket = decodeUtf8 . A.encode <$> basket
}
where
@@ -805,6 +806,7 @@ mkOfferOrder OfferOrder {..} planId registrationDate dutyDate paymentMode numOfR
case offerListingMetric' of
LIST_BASED_ON_DATE listingDates -> pack $ formatTime defaultTimeLocale "%d_%m_%y" listingDates
_ -> show offerListingMetric'
+ parseUDF9 (MembershipStatus isMember) = if isMember then "TRUE" else "FALSE"
mkOfferCustomer :: OfferCustomer -> Juspay.OfferCustomer
mkOfferCustomer OfferCustomer {..} = Juspay.OfferCustomer {id = customerId, email, mobile}
diff --git a/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs b/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs
index 2fbb94230..96709fd18 100644
--- a/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs
+++ b/lib/mobility-core/src/Kernel/External/Payment/Interface/Types.hs
@@ -469,12 +469,16 @@ data OfferListReq = OfferListReq
dutyDate :: UTCTime,
paymentMode :: Text,
numOfRides :: Int,
- offerListingMetric :: Maybe UDF6
+ offerListingMetric :: Maybe UDF6,
+ membershipStatus :: Maybe UDF9
}
data UDF6 = IS_VISIBLE | IS_APPLICABLE | LIST_BASED_ON_DATE UTCTime
deriving stock (Show, Eq, Generic, Read)
+data UDF9 = MembershipStatus Bool
+ deriving stock (Show, Eq, Generic, Read)
+
data OfferOrder = OfferOrder
{ orderId :: Maybe Text,
amount :: HighPrecMoney,
diff --git a/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs b/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs
index 208614d95..537a39405 100644
--- a/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs
+++ b/lib/mobility-core/src/Kernel/External/Payment/Juspay/Types/Offer.hs
@@ -43,6 +43,7 @@ data OfferOrder = OfferOrder
udf4 :: Text,
udf5 :: Text,
udf6 :: Maybe Text,
+ udf9 :: Maybe Text,
basket :: Maybe Text
}
deriving stock (Show, Generic)
diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
index 46c17e9c8..f9bcc4fd6 100644
--- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
+++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs
@@ -1,3 +1,5 @@
+{-# LANGUAGE BangPatterns #-}
+
{-
Copyright 2022-23, Juspay India Pvt Ltd
@@ -17,16 +19,25 @@ module Kernel.Storage.Hedis.Queries (module Reexport, module Kernel.Storage.Hedi
import qualified Data.Aeson as Ae
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
+import Data.Either (partitionEithers)
+import Data.Hashable (Hashable, hash)
+import qualified Data.IntMap.Strict as IntMap
+import qualified Data.List as DL
+import qualified Data.List.NonEmpty as NE
+import qualified Data.Map.Strict as Map
import Data.String.Conversions
-import Data.Text hiding (concatMap, map, null)
+import Data.Text hiding (any, chunksOf, concat, concatMap, length, map, null, replicate, zip)
import qualified Data.Text as T
import qualified Data.Text as Text
+import qualified Data.Vector as V
+import Database.Redis (keyToSlot)
import Database.Redis as Reexport (GeoBy (..), GeoFrom (..), Queued, Redis, RedisTx, Reply, TxResult (..))
import qualified Database.Redis as Hedis
import qualified Database.Redis.Cluster as Cluster
+import qualified EulerHS.Language as L
import EulerHS.Prelude (whenLeft)
import GHC.Records.Extra
-import Kernel.Beam.Connection.EnvVars (getRunInMasterCloudRedisCell, getRunInMasterLTSRedisCell)
+import Kernel.Beam.Connection.EnvVars (getClusterMGetAsyncEnabled, getRunInMasterCloudRedisCell, getRunInMasterLTSRedisCell)
import Kernel.Prelude
import Kernel.Storage.Hedis.Config
import Kernel.Storage.Hedis.Error
@@ -176,7 +187,7 @@ runInMultiCloudRedisMaybeResult action = do
case primaryResult of
Just _ -> pure primaryResult -- Primary has result, return immediately
Nothing -> do
- logError $ "SECONDARY_CLUSTER: Primary returned Nothing, trying secondary"
+ logWarning $ "SECONDARY_CLUSTER: Primary returned Nothing, trying secondary"
-- Primary returned Nothing, try secondary
secondaryResult <-
withTryCatch "runInMultiCloudRedisMaybeResult" $
@@ -360,6 +371,167 @@ safeGet ::
(FromJSON a, HedisFlow m env, TryException m) => Text -> m (Maybe a)
safeGet key = get' key (del key)
+-- | Internal: cluster MGET returning one Vector slot per input key, aligned
+-- by index. Hits → @Just bs@; misses, Redis errors, fork failures all → @Nothing@.
+-- Groups by hash slot (Cluster requires single-slot MGETs); when async is
+-- enabled, runs forks in capped chunks to bound parallelism.
+mGetClusterRaw ::
+ forall m env.
+ (HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) =>
+ [Text] ->
+ m (V.Vector (Maybe BS.ByteString))
+mGetClusterRaw [] = pure V.empty
+mGetClusterRaw keys = withLogTag "CLUSTER" $ do
+ let !nKeys = length keys
+ prefKeys <- mapM buildKey keys
+ let !groups =
+ IntMap.elems $
+ IntMap.fromListWith
+ (<>)
+ [(fromEnum (keyToSlot pk), NE.singleton (i, pk)) | (i, pk) <- zip [0 ..] prefKeys]
+ asyncEnabled <- liftIO getClusterMGetAsyncEnabled
+ results <-
+ if asyncEnabled && length groups > 1
+ then concat <$> traverse runChunk (chunksOf clusterMGetForkLimit groups)
+ else traverse runGroup groups
+ pure $! V.replicate nKeys Nothing V.// concat results
+ where
+ runGroup :: NonEmpty (Int, BS.ByteString) -> m [(Int, Maybe BS.ByteString)]
+ runGroup grp = do
+ let (idxs, prefKs) = NE.unzip grp
+ missForGroup = map (,Nothing) (NE.toList idxs)
+ result <-
+ withTimeRedis "RedisCluster" "mget" $
+ withTryCatch "mGetCluster" $
+ runHedisEither $ Hedis.mget (NE.toList prefKs)
+ case result of
+ Left exc -> do
+ logTagError "ERROR_WHILE_MGET" $ "Cluster MGET threw: " <> show exc
+ pure missForGroup
+ Right (Left reply) -> do
+ logTagError "ERROR_WHILE_MGET" $ "Cluster MGET failed: " <> show reply
+ pure missForGroup
+ Right (Right listBS) ->
+ -- Defensive: tolerate a malformed Redis reply that disagrees on length.
+ let !padded = DL.take (NE.length grp) (listBS <> DL.repeat Nothing)
+ in pure $ DL.zip (NE.toList idxs) padded
+
+ runChunk :: [NonEmpty (Int, BS.ByteString)] -> m [[(Int, Maybe BS.ByteString)]]
+ runChunk chunk = do
+ awaitables <- forM (DL.zip [0 :: Int ..] chunk) $ \(j, grp) ->
+ (grp,) <$> awaitableFork ("mGetCluster:" <> show j) (runGroup grp)
+ forM awaitables $ \(grp, aw) -> do
+ res <- L.await Nothing aw
+ case res of
+ Right xs -> pure xs
+ Left err -> do
+ logTagError "ERROR_WHILE_MGET_FORK" $ "Concurrent fork failed: " <> show err
+ pure $ map ((,Nothing) . fst) (NE.toList grp)
+
+ chunksOf :: Int -> [a] -> [[a]]
+ chunksOf k = DL.unfoldr step
+ where
+ step [] = Nothing
+ step xs = Just (DL.splitAt k xs)
+
+-- | Cluster MGET returning just the decoded values that were found. No
+-- order guarantee on the output. Decode failures are logged and dropped.
+mGetCluster ::
+ (FromJSON a, HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) =>
+ [Text] ->
+ m [a]
+mGetCluster keys = do
+ raw <- mGetClusterRaw keys
+ catMaybes . V.toList <$> V.mapM decodeBytesLogging raw
+
+-- | Cluster MGET returning (key, value) pairs in input-key order. Decode
+-- failures are logged (with the offending key) and dropped; the key is
+-- left in Redis for the writer to fix or overwrite.
+mGetClusterWithKeys ::
+ (FromJSON a, HedisFlow m env, TryException m, L.MonadFlow m, Forkable m) =>
+ [Text] ->
+ m [(Text, a)]
+mGetClusterWithKeys keys = do
+ raw <- mGetClusterRaw keys
+ catMaybes . V.toList
+ <$> V.zipWithM decodeKeyedPair (V.fromList keys) raw
+
+-- | Internal: standalone MGET returning a Vector aligned by input index.
+-- Hits → @Just bs@; misses and Redis errors → @Nothing@.
+mGetStandaloneRaw ::
+ (HedisFlow m env, TryException m) =>
+ [Text] ->
+ m (V.Vector (Maybe BS.ByteString))
+mGetStandaloneRaw [] = pure V.empty
+mGetStandaloneRaw keys = withLogTag "STANDALONE" $ do
+ let !nKeys = length keys
+ prefKeys <- mapM buildKey keys
+ result <-
+ withTimeRedis "RedisStandalone" "mget" $
+ withTryCatch "mGetStandalone" $
+ runHedisEither' $ Hedis.mget prefKeys
+ case result of
+ Left exc -> do
+ logTagError "ERROR_WHILE_MGET" $ "Standalone MGET threw: " <> show exc
+ pure $ V.replicate nKeys Nothing
+ Right (Left reply) -> do
+ logTagError "ERROR_WHILE_MGET" $ "Standalone MGET failed: " <> show reply
+ pure $ V.replicate nKeys Nothing
+ Right (Right listBS) ->
+ pure $ V.fromListN nKeys (DL.take nKeys (listBS <> DL.repeat Nothing))
+
+-- | Standalone MGET returning just the decoded values that were found.
+mGetStandalone :: (FromJSON a, HedisFlow m env, TryException m) => [Text] -> m [a]
+mGetStandalone keys = do
+ raw <- mGetStandaloneRaw keys
+ catMaybes . V.toList <$> V.mapM decodeBytesLogging raw
+
+-- | Standalone MGET returning (key, value) pairs in input-key order. Decode
+-- failures are logged (with the offending key) and dropped.
+mGetStandaloneWithKeys ::
+ (FromJSON a, HedisFlow m env, TryException m) =>
+ [Text] ->
+ m [(Text, a)]
+mGetStandaloneWithKeys keys = do
+ raw <- mGetStandaloneRaw keys
+ catMaybes . V.toList
+ <$> V.zipWithM decodeKeyedPair (V.fromList keys) raw
+
+-- | Decode raw bytes; on failure log and return Nothing. Used by the
+-- values-only paths where the key isn't tracked.
+decodeBytesLogging ::
+ (FromJSON a, HedisFlow m env, TryException m) =>
+ Maybe BS.ByteString ->
+ m (Maybe a)
+decodeBytesLogging Nothing = pure Nothing
+decodeBytesLogging (Just bs) = case Ae.eitherDecode (BSL.fromStrict bs) of
+ Right v -> pure (Just v)
+ Left e -> do
+ logTagError "REDIS" $ "MGet decode failure: " <> cs e
+ pure Nothing
+
+-- | Decode bytes paired with their originating key. On bad JSON, log with the
+-- key for traceability and drop the entry — does NOT delete the key from Redis.
+decodeKeyedPair ::
+ (FromJSON a, HedisFlow m env, TryException m) =>
+ Text ->
+ Maybe BS.ByteString ->
+ m (Maybe (Text, a))
+decodeKeyedPair _ Nothing = pure Nothing
+decodeKeyedPair k (Just bs) = case Ae.eitherDecode (BSL.fromStrict bs) of
+ Right v -> pure (Just (k, v))
+ Left e -> do
+ logTagError "REDIS" $ "MGet decode failure for key " <> k <> ": " <> cs e
+ pure Nothing
+
+decodeMGetResult ::
+ (FromJSON a, HedisFlow m env, TryException m) =>
+ Text ->
+ Maybe BS.ByteString ->
+ m (Maybe a)
+decodeMGetResult _ Nothing = pure Nothing
+decodeMGetResult key (Just bs) = decodeJSONWithErrorHandler key bs (del key)
+
set ::
(ToJSON a, HedisFlow m env, TryException m) => Text -> a -> m ()
set key val = withLogTag "Redis" $ do
@@ -1125,6 +1297,11 @@ geoSearch key from by = withLogTag "Redis" $ do
pure [] -- Return an empty list if there was an error
Right items -> pure items
+-- | Hard cap on concurrent forks so a request with keys spanning many slots
+-- can't unbounded-fan-out into a fork storm.
+clusterMGetForkLimit :: Int
+clusterMGetForkLimit = 32
+
geoSearchDecoded ::
(FromJSON a, HedisFlow m env, TryException m) =>
Text ->
@@ -1187,3 +1364,117 @@ sAdd key members = withLogTag "Redis" $ do
\err ->
withLogTag "CLUSTER" $
logTagInfo "FAILED_TO_SADD" (show err)
+
+-- =============================================================================
+-- Cluster-aware bulk Redis. Groups items by their cluster hash slot
+-- (CRC16 → 0..16383) so every command targeting a single shard runs inside
+-- ONE 'runRedis' block: hedis pipelines automatically inside a Redis monad
+-- action, so N commands cost one TCP round-trip per shard. Shards run in
+-- parallel via awaitableFork+await (same pattern as Domain.Utils.mapConcurrently),
+-- so total wallclock is (slowest shard), not (sum of all shards).
+--
+-- Same prefix semantics as the rest of the wrapper: keys go through
+-- 'buildKey' (which reads 'hedisEnv.keyModifier'), so callers wrapping in
+-- 'withCrossAppRedis' get raw keys; without it, the configured app prefix
+-- applies. Either way, writes via these helpers are interchangeable with
+-- reads via 'get' / 'setExp' / etc.
+--
+-- Two flavours:
+--
+-- 1. 'bulkShardedRedis' — per-item action. Use when each command operates on
+-- one key (GET / SET / SETEX / SETNX / DEL / EXPIRE / TTL / EXISTS / INCR
+-- / DECR / HGET / HSET / SADD / ZADD / LPUSH / RPUSH / etc.). Action
+-- receives the original item plus the already-prefixed key bytes — same
+-- shape as 'runWithPrefix'\'s action.
+--
+-- 2. 'bulkShardedRedisBatch' — per-shard batch action receiving the bucket
+-- of items for one slot. Use for multi-key single commands — every key
+-- in a bucket already shares a slot by construction, so cluster's
+-- same-slot rule for these commands is satisfied: MGET / MSET / MSETNX /
+-- SUNION / SINTER / SDIFF / DEL [k1,k2..] / EXISTS [k1,k2..] / TOUCH /
+-- UNLINK / ZUNIONSTORE / etc. Returns per-shard results concatenated;
+-- pair items with results inside the action ('zip' inputs with reply)
+-- if you need to align them back to input order.
+--
+-- Out of scope: MULTI/EXEC transactions — use 'runHedisTransaction' instead.
+--
+-- Standalone (non-clustered) Redis: 'keyToSlot' still buckets keys but every
+-- bucket resolves to the same connection, so grouping is harmless overhead.
+-- =============================================================================
+
+-- | Produce a Redis-cluster hash-tag segment "{shard-N}" that buckets the input
+-- into one of 'shards' synthetic slots. Embedding it anywhere in a Redis key
+-- forces all keys sharing a bucket onto the same cluster slot (Redis hashes
+-- the first '{...}' segment), which is the precondition for the bulk
+-- helpers below to actually pipeline a batch.
+--
+-- Tuning: 8–32 is the sweet spot for batches of 5–50 on 3–6 cluster shards.
+-- Smaller (1–4) risks write hotspots; larger (256+) erodes the pipelining
+-- win because per-batch slot count approaches batch size.
+shardHashTag :: Hashable a => Int -> a -> Text
+shardHashTag shards x =
+ let n = max 1 shards
+ in "{shard-" <> show (hash x `mod` n) <> "}"
+
+-- | Per-call key cap inside a single shard. A shard's items are split into
+-- batches of this size and the caller's action runs once per batch, so a
+-- single MGET/MSET never balloons past this many keys.
+bulkShardBatchSize :: Int
+bulkShardBatchSize = 100
+
+bulkShardedRedisBatch ::
+ (HedisFlow m env, TryException m, Forkable m, L.MonadFlow m) =>
+ -- | extracts the raw (un-prefixed) routing key from each item
+ (a -> Text) ->
+ -- | per-shard action — invoked once per slot. All items in the input list
+ -- share a cluster slot, so the caller should use a multi-key wrapper
+ -- ('mGetCluster', 'mSet', etc.) to get one pipelined round-trip per shard.
+ ([a] -> m [b]) ->
+ [a] ->
+ m [b]
+bulkShardedRedisBatch _ _ [] = pure []
+bulkShardedRedisBatch keyOf shardOp items = do
+ slotted <- forM items $ \x -> do
+ keyBs <- buildKey (keyOf x)
+ let slot = fromIntegral (Hedis.keyToSlot keyBs) :: Int
+ pure (slot, x)
+ let groups = Map.elems $ Kernel.Prelude.foldl' insertItem Map.empty slotted
+ Kernel.Prelude.concat <$> traverse runChunk (chunksOf clusterMGetForkLimit groups)
+ where
+ insertItem acc (slot, x) = Map.insertWith (++) slot [x] acc
+ runShard shardItems = Kernel.Prelude.concat <$> mapM shardOp (chunksOf bulkShardBatchSize shardItems)
+ runChunk chunk = do
+ awaitables <- mapM (awaitableFork "bulkShardedRedisBatch" . runShard) chunk
+ results <- mapM (L.await Nothing) awaitables
+ case partitionEithers results of
+ ([], successes) -> pure (Kernel.Prelude.concat successes)
+ (err : _, _) ->
+ Error.throwError $ HedisReplyError $ "bulkShardedRedisBatch shard fork failed: " <> show err
+ chunksOf k = DL.unfoldr step
+ where
+ step [] = Nothing
+ step xs = Just (DL.splitAt k xs)
+
+-- | Bulk SET-with-TTL via a single Lua script — one Redis command per call,
+-- atomic on the server side. Designed to be passed as the per-shard action to
+-- 'bulkShardedRedisBatch': all input pairs MUST hash to the same cluster slot
+-- (use 'shardHashTag' to bucket), otherwise cluster will return a CROSSSLOT
+-- error. Reply errors propagate via 'runHedis'.
+-- Use it for less number of keys per call (up to ~100) to get the pipelining and atomicity benefits;
+setExpMany ::
+ (HedisFlow m env, TryException m, ToJSON a) =>
+ ExpirationTime ->
+ [(Text, a)] ->
+ m ()
+setExpMany _ [] = pure ()
+setExpMany expirationTime pairs = withTimeRedis "RedisCluster" "setExpMany" . withLogTag "Redis" $ do
+ prefKeys <- mapM (buildKey . fst) pairs
+ let vals = map (BSL.toStrict . Ae.encode . snd) pairs
+ ttlArg = cs (show (toInteger expirationTime) :: String) :: BS.ByteString
+ script =
+ "local ttl = tonumber(ARGV[1]) "
+ <> "for i = 1, #KEYS do "
+ <> "redis.call('SET', KEYS[i], ARGV[i + 1], 'EX', ttl) "
+ <> "end "
+ <> "return 'OK'"
+ void . runHedis $ (Hedis.eval script prefKeys (ttlArg : vals) :: Redis (Either Reply Reply))
diff --git a/lib/mobility-core/test-integration/Main.hs b/lib/mobility-core/test-integration/Main.hs
new file mode 100644
index 000000000..5c1a809e1
--- /dev/null
+++ b/lib/mobility-core/test-integration/Main.hs
@@ -0,0 +1,225 @@
+{-
+ Copyright 2022-23, Juspay India Pvt Ltd
+
+ This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License
+
+ as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is
+
+ distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero
+
+ General Public License along with this program. If not, see .
+-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE DerivingStrategies #-}
+{-# LANGUAGE DuplicateRecordFields #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TypeApplications #-}
+{-# LANGUAGE TypeOperators #-}
+
+-- Real-world parity test: GET https://jsonplaceholder.typicode.com/todos/1
+-- via plain `callAPI` and via `runThroughMasterCloud` (through a local
+-- forwarder), then assert both decoded `Todo` values are identical.
+module Main (main) where
+
+import qualified Control.Concurrent.Async as Async
+import qualified Control.Exception as Exc
+import qualified Data.HashMap.Strict as HM
+import Data.IORef
+import qualified Data.Text as T
+import qualified EulerHS.Runtime as R
+import qualified EulerHS.Types as ET
+import Kernel.External.MasterCloudForward
+ ( HasMasterCloudForwarder (..),
+ MasterCloudProxyConfig (..),
+ forwardEgressApp,
+ runThroughMasterCloud,
+ )
+import Kernel.Prelude
+import qualified Kernel.Streaming.Kafka.Producer.Types as Kafka
+import qualified Kernel.Tools.Metrics.CoreMetrics.Types as Metrics
+import qualified Kernel.Types.Flow as KFlow
+import Kernel.Types.Logging (LogLevel (..), LoggerConfig (..))
+import qualified Kernel.Utils.IOLogging as IOLogging
+import Kernel.Utils.Servant.Client (callAPI)
+import qualified Network.HTTP.Client as Http
+import qualified Network.HTTP.Client.TLS as HttpTLS
+import qualified Network.Wai.Handler.Warp as Warp
+import Servant
+import qualified Servant.Client as SC
+import System.Environment (setEnv)
+import System.Exit (ExitCode (..), exitWith)
+import qualified Prelude as P
+
+-- Mirrors the real /todos/{id} response shape.
+data Todo = Todo
+ { userId :: Int,
+ id :: Int,
+ title :: Text,
+ completed :: Bool
+ }
+ deriving stock (Generic, Eq, Show)
+ deriving anyclass (FromJSON, ToJSON)
+
+type TodoAPI = "todos" :> Capture "id" Int :> Get '[JSON] Todo
+
+todoAPI :: Proxy TodoAPI
+todoAPI = Proxy
+
+getTodoClient :: Int -> ET.EulerClient Todo
+getTodoClient = ET.client todoAPI
+
+-- Minimal AppEnv carrying the forwarder config plus the fields
+-- MonadFlow / HasARTFlow / HasCoreMetrics demand on r.
+data TestEnv = TestEnv
+ { teProxyConfig :: MasterCloudProxyConfig,
+ requestId :: Maybe Text,
+ sessionId :: Maybe Text,
+ loggerEnv :: IOLogging.LoggerEnv,
+ shouldLogRequestId :: Bool,
+ kafkaProducerForART :: Maybe Kafka.KafkaProducerTools,
+ coreMetrics :: Metrics.CoreMetricsContainer,
+ version :: Metrics.DeploymentVersion,
+ url :: Maybe Text
+ }
+ deriving (Generic)
+
+instance HasMasterCloudForwarder TestEnv where
+ masterCloudProxyConfig = teProxyConfig
+
+forwarderPort :: Int
+forwarderPort = 9101
+
+mkBaseUrl :: SC.Scheme -> P.String -> Int -> P.String -> BaseUrl
+mkBaseUrl scheme host port path =
+ BaseUrl
+ { baseUrlScheme = scheme,
+ baseUrlHost = host,
+ baseUrlPort = port,
+ baseUrlPath = path
+ }
+
+silentLoggerConfig :: LoggerConfig
+silentLoggerConfig =
+ LoggerConfig
+ { level = ERROR,
+ logToFile = False,
+ logFilePath = "/tmp/master-cloud-forward-itest.log",
+ logToConsole = False,
+ logRawSql = False,
+ prettyPrinting = False
+ }
+
+bootstrapEnv ::
+ IOLogging.LoggerEnv ->
+ Metrics.CoreMetricsContainer ->
+ TestEnv
+bootstrapEnv logger metrics =
+ TestEnv
+ { teProxyConfig =
+ MasterCloudProxyConfig
+ { masterUrl = Just (mkBaseUrl SC.Http "localhost" forwarderPort ""),
+ masterSecret = Just "itest-secret"
+ },
+ requestId = Just "itest-req",
+ sessionId = Just "itest-sess",
+ loggerEnv = logger,
+ shouldLogRequestId = False,
+ kafkaProducerForART = Nothing,
+ coreMetrics = metrics,
+ version = Metrics.DeploymentVersion "itest-0.0.0",
+ url = Nothing
+ }
+
+-- Wraps forwardEgressApp with a hit counter so we can assert traffic actually
+-- traversed it.
+countingForwarderApp ::
+ IOLogging.LoggerEnv ->
+ IORef Int ->
+ MasterCloudProxyConfig ->
+ Http.Manager ->
+ Application
+countingForwarderApp logEnv hits cfg mgr req sendResp = do
+ modifyIORef' hits (+ 1)
+ forwardEgressApp logEnv cfg mgr req sendResp
+
+main :: IO ()
+main = do
+ P.putStrLn "[itest] starting jsonplaceholder forwarder parity test"
+
+ -- TLS-aware Manager: used by both the local forwarder (to reach typicode)
+ -- AND the EulerHS FlowRuntime (for both the direct callAPI path and the
+ -- GCP→forwarder hop).
+ mgr <- Http.newManager HttpTLS.tlsManagerSettings
+
+ forwarderHits <- newIORef (0 :: Int)
+ let fwdCfg =
+ MasterCloudProxyConfig
+ { masterUrl = Nothing,
+ masterSecret = Just "itest-secret"
+ }
+
+ result <- Exc.try @Exc.SomeException $
+ IOLogging.withLoggerEnv silentLoggerConfig (Just "itest") $ \logger -> do
+ forwarderAsync <-
+ Async.async . Warp.run forwarderPort $
+ countingForwarderApp logger forwarderHits fwdCfg mgr
+
+ -- Let Warp bind.
+ liftIO $ threadDelay 1000000
+
+ coreMx <- Metrics.registerCoreMetricsContainer
+ R.withFlowRuntime Nothing $ \flowRt0 -> do
+ let flowRt = flowRt0 {R._httpClientManagers = HM.insert "default" mgr (R._httpClientManagers flowRt0)}
+ env = bootstrapEnv logger coreMx
+ todoBaseUrl = mkBaseUrl SC.Https "jsonplaceholder.typicode.com" 443 ""
+
+ -- Path 1: plain callAPI — direct to typicode, baseline.
+ rDirect <-
+ KFlow.runFlowR flowRt env $
+ callAPI todoBaseUrl (getTodoClient 1) "getTodo-direct" todoAPI
+
+ -- Path 2: runThroughMasterCloud — env on, forwarder configured →
+ -- request goes through localhost forwarder which replays to typicode.
+ setEnv "RUN_API_IN_MASTER_CLOUD" "True"
+ rForwarded <-
+ KFlow.runFlowR flowRt env $
+ runThroughMasterCloud todoBaseUrl (getTodoClient 1) "getTodo-forwarded"
+
+ -- Compare.
+ case (rDirect, rForwarded) of
+ (Right todoDirect, Right todoFwd) -> do
+ P.putStrLn $ "[itest] direct : " <> show todoDirect
+ P.putStrLn $ "[itest] forwarded : " <> show todoFwd
+ when (todoDirect /= todoFwd) $
+ fail' ("mismatch: direct=" <> show todoDirect <> " forwarded=" <> show todoFwd)
+ fHits <- readIORef forwarderHits
+ when (fHits /= 1) $
+ fail' ("forwarder hits=" <> show fHits <> ", expected 1")
+ P.putStrLn "PASS — direct and forwarded responses are byte-for-byte identical"
+ P.putStrLn $ " forwarder hit count: " <> show fHits
+ (Left e, _) ->
+ fail' ("direct callAPI failed: " <> show e)
+ (_, Left e) ->
+ fail' ("forwarded call failed: " <> show e)
+
+ Async.cancel forwarderAsync
+
+ case result of
+ Left e -> do
+ P.putStrLn $ "[itest] EXCEPTION: " <> P.show e
+ exitWith (ExitFailure 1)
+ Right _ -> exitWith ExitSuccess
+ where
+ fail' reason = do
+ P.putStrLn $ T.unpack ("FAIL: " <> T.pack reason)
+ exitWith (ExitFailure 1)