Skip to content

router: make full map_callrw with split args#644

Open
mrForza wants to merge 6 commits into
tarantool:masterfrom
mrForza:gh-559-full-map-call-rw-with-split-args
Open

router: make full map_callrw with split args#644
mrForza wants to merge 6 commits into
tarantool:masterfrom
mrForza:gh-559-full-map-call-rw-with-split-args

Conversation

@mrForza

@mrForza mrForza commented Mar 4, 2026

Copy link
Copy Markdown
Contributor

This patch introduces a new way of map_callrw execution by which we can
pass some arguments to all storages and split buckets' arguments to those
storages that have at least one bucket of bucket_ids. To achieve this we
introduce a new string option - mode to map_callrw api.

Also we change the logic of router_ref_storage_all ref function. Firstly
we ref all storages and get back an amount of "moved" buckets according
to the previously built router's cache. Then if there are no "moved"
buckets we accumulate and check total amount of buckets on all storages
and finish map_callrw ref stage. Otherwise, if there are some "moved"
buckets we perform the second network hop by checking on which replicasets
do the remaining "moved" buckets reside on.

Closes #559

@Serpentian Serpentian left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done, only one major comment (upgrade), we must address, other ones are nits and smth to think about)

Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
-- high-level ref functions (such as router_ref_storage_all and router_ref_
-- storage_by_buckets).
--
local function router_ref_send(router, timeout, args_builder, grouped_buckets)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, did you consider making it even more general: router_map_callrw_send and router_map_callrw_collect and also reuse them in the replicasets_map_reduce too? There we have almost the same code. Not a call to action, just something to think about.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, router_map_callrw_prepare/send/collect sounds better.

But I don't think that we can easily reuse these functions in replicasets_map_reduce because:

  1. We need to change api of router_map_callrw_send by
    • changing router param into replicasets_all, as replicasets_map_reduce does not have router variable in it.
    • adding extra argument - return_raw
  2. The logic of sending map stage in replicasets_map_reduce is more complex than logic of router_map_callrw_send. In last one we only do RPC with no arguments or with grouped_buckets[rs_id] and that's all. But in replicasets_map_reduce we need to dynamically change those arguments which should be passed in RPC. Before RPC we add bucket arguments to args and after RPC we delete them. The main problem here is that we can only easily add groupped buckets in args builder, but not delete, because the deletion is happened after RPC. If we want to do it in our new router_map_callrw_send function we need to complicate args_builder and use table.deepcopy (it can slow down perf).
  3. We need to change api of router_map_callrw_collect by passing extra return_raw argument.
  4. Also router_map_callrw_collect will be complicated as we should add two different ways of extracting results.

@Serpentian Serpentian Apr 8, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing router param into replicasets_all, as replicasets_map_reduce does not have router variable in it.

1.1 Yes, and it will become replicasets_map_send, which is way cleaner, since you will explicitly pass the replicasets and not use some non obvious loigic to create the list of replicasets based on optional argument grouped_buckets (which won't be needed). You can return replicasets from the router_map_prepare.

adding extra argument - return_raw

1.2. Let's just pass the opts and add is_async explicitly inside the function. And call the funciton router_map_callrw_send_async. The name should always say, what the function does.

The logic of sending map stage in replicasets_map_reduce is more complex than logic of router_map_callrw_send.

  1. Agree, see no other way than deepcopy.

We need to change api of router_map_callrw_collect by passing extra return_raw argument.
Also router_map_callrw_collect will be complicated as we should add two different ways of extracting results.

  1. You can always just create the new function router_map_callrw_collect_raw and not pass the raw to it

Again, up to you, if you think, that the current variant is better, I'm ok

Comment thread vshard/storage/init.lua Outdated
Comment thread vshard/storage/init.lua Outdated
Comment thread vshard/storage/init.lua Outdated
@Serpentian Serpentian requested a review from Gerold103 March 12, 2026 18:23
@Serpentian Serpentian assigned Gerold103 and mrForza and unassigned Serpentian Mar 12, 2026

@kamenkremen kamenkremen left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great patch! Left some minor comments below

Comment thread vshard/router/init.lua Outdated
local futures = {}
local opts_async = {is_async = true}
local replicasets_all = router.replicasets
local rs_ids = grouped_buckets and grouped_buckets or replicasets_all

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that equivalent to local rs_ids = grouped_buckets or replicasets_all?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua

--
-- Perform Ref stage of the Ref-Map-Reduce process on a subset of all the
-- replicasets, which contains all the listed bucket IDs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment got cut out

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
--
-- Ref stage: collect.
--
futures = futures or {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures is already either {}(because of line 820) or some table, so we don't need this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
-- Group the buckets by replicasets according to the router cache.
grouped_buckets, err = buckets_group(router, bucket_ids, timeout)
if err ~= nil then
return nil, err

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: triple space in indentation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua
Comment thread vshard/router/init.lua
Comment thread test/router-luatest/map_callrw_test.lua
Comment thread test/router-luatest/map_callrw_test.lua
@kamenkremen kamenkremen removed their assignment Mar 20, 2026
@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch 2 times, most recently from 8962951 to 8f5223c Compare April 1, 2026 12:58
@mrForza mrForza requested a review from Serpentian April 1, 2026 13:33
@mrForza mrForza assigned Serpentian and unassigned Gerold103 and mrForza Apr 1, 2026

@Serpentian Serpentian left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any major comments, I think it's time to ask @Gerold103 for review, so that we can be sure, that there no major flaws, which I've missed

Comment thread test/luatest_helpers/vtest.lua Outdated
Comment thread test/router-luatest/reload_test.lua Outdated
Comment thread test/router-luatest/reload_test.lua
Comment thread test/router-luatest/reload_test.lua Outdated
Comment thread test/router-luatest/reload_test.lua Outdated
Comment thread test/router-luatest/reload_test.lua Outdated
Comment thread vshard/storage/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
local replicasets_all = router.replicasets
local deadline = fiber_clock() + timeout
if bucket_ids then
bucket_ids = bucket_ids or {}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be in the first commit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not

Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
@Serpentian Serpentian assigned Gerold103 and mrForza and unassigned Serpentian Apr 9, 2026

@Gerold103 Gerold103 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this complex topic and being patient about comments 🙏.

Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/router/init.lua Outdated
Comment thread vshard/storage/init.lua Outdated
Comment thread vshard/storage/init.lua Outdated
Comment thread vshard/storage/init.lua Outdated
@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch 2 times, most recently from 3a2407a to 74f7492 Compare May 27, 2026 14:34
@mrForza mrForza requested a review from Gerold103 May 27, 2026 19:04
Comment thread vshard/router/init.lua

return replicaset_buckets
timeout = deadline - fiber_clock()
return replicaset_buckets, nil, timeout

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions in vshard all have the agreement to either return nil, err, err_args ...., or result1, result1, .... The differentiation happens by the first result - whether it is nil or not. Thus you don't need to return nil on the second place in case of success. You can just return res1, res2, res3, ... . Not res1, nil, res2, res3, ... .

@mrForza mrForza Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but we didn't follow this logic in old map_callrw. E.g. router_ref_storage_all or router_ref_storage_by_buckets.

Following this rule we should change the order of returned values not only from buckets_group, but also in router_map_callrw_prepare, router_map_callrw_ref_wait, router_ref_storage_all, router_ref_storage_by_buckets

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer changing it in all places (before your patches there were only two these strange functions). This is probably the heritage from picodata and it looks very strange to return nil error in the middle of the arguments in case of success

Comment thread vshard/router/init.lua Outdated
-- high-level ref functions (such as router_ref_storage_all and router_ref_
-- storage_by_buckets).
--
local function router_map_callrw_ref_send(router, timeout, args_builder,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, lets drop this send-function and the args builder. I tried doing that and got this:

  • Inlining allows to reuse the opts_async across all usage places.
  • Inlining doesn't need to introduce the args_builder closure callback.
  • Inlining is even 20% shorter (~40 lines inlined compared to ~50 lines with the send-function)

The whole send-function is basically just a loop calling a callback and making a callrw. The callback is different at every callsite. And the loop as a language construction isn't worth its own function.

Diff
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 3aadcc2..c84f539 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -809,31 +809,6 @@ local function router_map_callrw_prepare(router, timeout, mode, bucket_ids)
     return timeout, nil, nil, grouped_buckets
 end
 
---
--- Sends asynchronous refs to the remote storages and forms a table of future
--- objects. An arguments' table for storage_ref_* functions is built according
--- to args_builder closure which captures necessary router's variables from
--- high-level ref functions (such as router_ref_storage_all and router_ref_
--- storage_by_buckets).
---
-local function router_map_callrw_ref_send(router, timeout, args_builder,
-                                      grouped_buckets)
-    local futures = {}
-    local opts_async = {is_async = true}
-    local replicasets_all = router.replicasets
-    local rs_ids = grouped_buckets or replicasets_all
-    for rs_id, _ in pairs(rs_ids) do
-        local args_ref = args_builder(rs_id)
-        local res, err = replicasets_all[rs_id]:callrw('vshard.storage._call',
-                                                       args_ref, opts_async)
-        if res == nil then
-            return nil, err, rs_id
-        end
-        futures[rs_id] = res
-    end
-    return timeout, nil, nil, futures
-end
-
 --
 -- Waits until all future objects are ready and extracts results from it.
 --
@@ -892,27 +867,32 @@ end
 local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     local mode = MAP_CALLRW_FULL
     local bucket_count = 0
-    local err, err_id, grouped_buckets, args_builder, results
+    local err, err_id, grouped_buckets, results
     local futures = {}
     local replicasets_all = router.replicasets
+    local opts_async = {is_async = true}
 
     timeout, err, err_id, grouped_buckets = router_map_callrw_prepare(
         router, timeout, mode, bucket_ids)
     if not timeout then
         goto fail
     end
-    args_builder = function(rs_id)
-        local buckets = grouped_buckets[rs_id] or {}
-        if grouped_buckets[rs_id] then
-            return {'storage_ref_make_with_buckets', rid, timeout, buckets}
+    futures = {}
+    for rs_id in pairs(replicasets_all) do
+        local args_ref
+        local buckets = grouped_buckets[rs_id]
+        if buckets then
+            args_ref = {'storage_ref_make_with_buckets', rid, timeout, buckets}
         else
-            return {'storage_ref', rid, timeout}
+            args_ref = {'storage_ref', rid, timeout}
         end
-    end
-    timeout, err, err_id, futures = router_map_callrw_ref_send(
-        router, timeout, args_builder)
-    if not timeout then
-        goto fail
+        local res, ref_err = replicasets_all[rs_id]:callrw(
+            'vshard.storage._call', args_ref, opts_async)
+        if res == nil then
+            err, err_id = ref_err, rs_id
+            goto fail
+        end
+        futures[rs_id] = res
     end
     timeout, err, err_id, results = router_map_callrw_ref_wait(
         futures, timeout)
@@ -934,13 +914,16 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     end
     bucket_ids = router_map_callrw_process_moved(router, {}, results)
     if next(bucket_ids) then
-        args_builder = function()
-            return {'storage_ref_check_existing', rid, bucket_ids}
-        end
-        timeout, err, err_id, futures = router_map_callrw_ref_send(
-            router, timeout, args_builder)
-        if not timeout then
-            goto fail
+        local args_ref = {'storage_ref_check_existing', rid, bucket_ids}
+        futures = {}
+        for rs_id in pairs(replicasets_all) do
+            local res, ref_err = replicasets_all[rs_id]:callrw(
+                'vshard.storage._call', args_ref, opts_async)
+            if res == nil then
+                err, err_id = ref_err, rs_id
+                goto fail
+            end
+            futures[rs_id] = res
         end
         timeout, err, err_id, results = router_map_callrw_ref_wait(
             futures, timeout)
@@ -980,8 +963,10 @@ end
 --
 local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid)
     local mode = MAP_CALLRW_PARTIAL
-    local err, err_id, grouped_buckets, args_builder, results
+    local err, err_id, grouped_buckets, results
     local replicasets_to_map, futures = {}, {}
+    local replicasets_all = router.replicasets
+    local opts_async = {is_async = true}
     -- Nil checks are done explicitly here (== nil instead of 'not'), because
     -- netbox requests return box.NULL instead of nils.
     while next(bucket_ids) do
@@ -990,20 +975,25 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid)
         if not timeout then
             goto fail
         end
-        args_builder = function(rs_id)
+        futures = {}
+        for rs_id in pairs(grouped_buckets) do
+            local args_ref
             local buckets = grouped_buckets[rs_id] or {}
             if replicasets_to_map[rs_id] then
                 -- Replicaset is already referenced on a previous iteration.
                 -- Simply get the moved buckets without double referencing.
-                return {'storage_ref_check_with_buckets', rid, buckets}
+                args_ref = {'storage_ref_check_with_buckets', rid, buckets}
             else
-                return {'storage_ref_make_with_buckets', rid, timeout, buckets}
+                args_ref = {'storage_ref_make_with_buckets', rid, timeout,
+                            buckets}
             end
-        end
-        timeout, err, err_id, futures = router_map_callrw_ref_send(
-            router, timeout, args_builder, grouped_buckets)
-        if not timeout then
-            goto fail
+            local res, ref_err = replicasets_all[rs_id]:callrw(
+                'vshard.storage._call', args_ref, opts_async)
+            if res == nil then
+                err, err_id = ref_err, rs_id
+                goto fail
+            end
+            futures[rs_id] = res
         end
         timeout, err, err_id, results = router_map_callrw_ref_wait(
             futures, timeout)

Does it really not look simpler to you?

We can go even further and reuse the args Lua tables:

Optimized
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 3aadcc2..7311bca 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -809,31 +809,6 @@ local function router_map_callrw_prepare(router, timeout, mode, bucket_ids)
     return timeout, nil, nil, grouped_buckets
 end
 
---
--- Sends asynchronous refs to the remote storages and forms a table of future
--- objects. An arguments' table for storage_ref_* functions is built according
--- to args_builder closure which captures necessary router's variables from
--- high-level ref functions (such as router_ref_storage_all and router_ref_
--- storage_by_buckets).
---
-local function router_map_callrw_ref_send(router, timeout, args_builder,
-                                      grouped_buckets)
-    local futures = {}
-    local opts_async = {is_async = true}
-    local replicasets_all = router.replicasets
-    local rs_ids = grouped_buckets or replicasets_all
-    for rs_id, _ in pairs(rs_ids) do
-        local args_ref = args_builder(rs_id)
-        local res, err = replicasets_all[rs_id]:callrw('vshard.storage._call',
-                                                       args_ref, opts_async)
-        if res == nil then
-            return nil, err, rs_id
-        end
-        futures[rs_id] = res
-    end
-    return timeout, nil, nil, futures
-end
-
 --
 -- Waits until all future objects are ready and extracts results from it.
 --
@@ -892,27 +867,35 @@ end
 local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     local mode = MAP_CALLRW_FULL
     local bucket_count = 0
-    local err, err_id, grouped_buckets, args_builder, results
+    local err, err_id, grouped_buckets, results
     local futures = {}
     local replicasets_all = router.replicasets
+    local opts_async = {is_async = true}
+    local args_ref
 
     timeout, err, err_id, grouped_buckets = router_map_callrw_prepare(
         router, timeout, mode, bucket_ids)
     if not timeout then
         goto fail
     end
-    args_builder = function(rs_id)
-        local buckets = grouped_buckets[rs_id] or {}
-        if grouped_buckets[rs_id] then
-            return {'storage_ref_make_with_buckets', rid, timeout, buckets}
+    futures = {}
+    args_ref = {nil, rid, timeout}
+    for rs_id in pairs(replicasets_all) do
+        local buckets = grouped_buckets[rs_id]
+        if buckets then
+            args_ref[1] = 'storage_ref_make_with_buckets'
+            args_ref[4] = buckets
         else
-            return {'storage_ref', rid, timeout}
+            args_ref[1] = 'storage_ref'
+            args_ref[4] = nil
         end
-    end
-    timeout, err, err_id, futures = router_map_callrw_ref_send(
-        router, timeout, args_builder)
-    if not timeout then
-        goto fail
+        local res, ref_err = replicasets_all[rs_id]:callrw(
+            'vshard.storage._call', args_ref, opts_async)
+        if res == nil then
+            err, err_id = ref_err, rs_id
+            goto fail
+        end
+        futures[rs_id] = res
     end
     timeout, err, err_id, results = router_map_callrw_ref_wait(
         futures, timeout)
@@ -934,13 +917,16 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     end
     bucket_ids = router_map_callrw_process_moved(router, {}, results)
     if next(bucket_ids) then
-        args_builder = function()
-            return {'storage_ref_check_existing', rid, bucket_ids}
-        end
-        timeout, err, err_id, futures = router_map_callrw_ref_send(
-            router, timeout, args_builder)
-        if not timeout then
-            goto fail
+        args_ref = {'storage_ref_check_existing', rid, bucket_ids}
+        futures = {}
+        for rs_id in pairs(replicasets_all) do
+            local res, ref_err = replicasets_all[rs_id]:callrw(
+                'vshard.storage._call', args_ref, opts_async)
+            if res == nil then
+                err, err_id = ref_err, rs_id
+                goto fail
+            end
+            futures[rs_id] = res
         end
         timeout, err, err_id, results = router_map_callrw_ref_wait(
             futures, timeout)
@@ -980,8 +966,10 @@ end
 --
 local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid)
     local mode = MAP_CALLRW_PARTIAL
-    local err, err_id, grouped_buckets, args_builder, results
+    local err, err_id, grouped_buckets, results
     local replicasets_to_map, futures = {}, {}
+    local replicasets_all = router.replicasets
+    local opts_async = {is_async = true}
     -- Nil checks are done explicitly here (== nil instead of 'not'), because
     -- netbox requests return box.NULL instead of nils.
     while next(bucket_ids) do
@@ -990,20 +978,28 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid)
         if not timeout then
             goto fail
         end
-        args_builder = function(rs_id)
+        futures = {}
+        local args_ref = {nil, rid}
+        for rs_id in pairs(grouped_buckets) do
             local buckets = grouped_buckets[rs_id] or {}
             if replicasets_to_map[rs_id] then
                 -- Replicaset is already referenced on a previous iteration.
                 -- Simply get the moved buckets without double referencing.
-                return {'storage_ref_check_with_buckets', rid, buckets}
+                args_ref[1] = 'storage_ref_check_with_buckets'
+                args_ref[3] = buckets
+                args_ref[4] = nil
             else
-                return {'storage_ref_make_with_buckets', rid, timeout, buckets}
+                args_ref[1] = 'storage_ref_make_with_buckets'
+                args_ref[3] = timeout
+                args_ref[4] = buckets
             end
-        end
-        timeout, err, err_id, futures = router_map_callrw_ref_send(
-            router, timeout, args_builder, grouped_buckets)
-        if not timeout then
-            goto fail
+            local res, ref_err = replicasets_all[rs_id]:callrw(
+                'vshard.storage._call', args_ref, opts_async)
+            if res == nil then
+                err, err_id = ref_err, rs_id
+                goto fail
+            end
+            futures[rs_id] = res
         end
         timeout, err, err_id, results = router_map_callrw_ref_wait(
             futures, timeout)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
local rs_total = type(res) == 'table' and res.total or res
bucket_count = bucket_count + rs_total
end
bucket_ids = router_map_callrw_process_moved(router, {}, results)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

router_map_callrw_process_moved() takes 2 params, not 3. The third one is ignored. Do tests pass right now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread test/router-luatest/reload_test.lua Outdated
}
local global_cfg

local function get_config_for_specific_vshard_version()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have vshard in the name. The whole project is vshard, so here everything by default is about vshard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -691,3 +691,180 @@ g.test_map_callrw_with_cdata_bucket_id = function(cg)
ilt.assert_not(err)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last commit says in its doc request:

mode = 'full'. In this mode user function will be executed with args on
   all storages in cluster. If we pass 'bucket_ids' like a map ....

which means users can pass bucket_ids with mode='full'.
And then we say:

Also now `map_callrw` ends with error in
cases of `<mode = 'full', bucket_ids = {1, 2, ...}>` ...

which looks conflicting.

It took me a couple of minutes to understand that the first paragraph means bucket_ids being {id = {args}, id = {args}, ...}. And the second talks about bucket_ids = {id, id, id, ...}. This might indicate me not being the brightest bulb 😂, but some users might also get confused then. Lets make it more explicit somehow.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/storage/init.lua Outdated
local function bucket_get_existing(bucket_ids)
local res = {}
for _, bucket_id in pairs(bucket_ids) do
local bucket = M.route_map[bucket_id] or

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

route_map isn't bucket ref cache. It stores {id = dst} map. The values here are destination IDs, not bucket objects. I am not sure this code works. Does it? When the ID is in route_map, your bucket becomes equal a string, and later bucket.status will raise a Lua error on attempt to index a string.

The bucket ref cache is in M.bucket_refs. Buckets there have no status. You have to look at the ro_lock and rw_lock to understand if the bucket is readable/writable.

Hm. With that said I realized just now that our storage_ref_check_existing must be mode-aware, no? Because right now it must check the bucket is writable. READONLY isn't enough for a map-callrw. And when we introduce map_callro, we must allow READONLY. Which means storage_ref_check_existing should behave different depending on the map-call-mode. Or am I wrong?

I think this might be bigger than I thought, so perhaps @Serpentian can join.

Comment thread vshard/error.lua Outdated
},
[43] = {
name = 'UNSUPPORTED',
msg = 'Can\'t perform %s. The storage should be upgraded',

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error codes are shared by storage and router. If the code says 'unsupported', then the message must be storage/router-agnostic (not mention storage or router).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
if type(res) ~= 'table' then
goto continue
end
local moved = res.moved or {}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create a table when res.moved is nil. Can simply do goto continue then.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
end
for _, res in pairs(results) do
bucket_count = bucket_count + res
if type(res) == 'table' and not res.total then

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but it still looks very very strange and fragile, that we use type(res) as a hint which ref-function was invoked. Especially given that this check is repeated at least in 3 different places and is highly non-obvious.

I suggest you the following diff:

diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 7311bca..7f9f177 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -810,10 +810,10 @@ local function router_map_callrw_prepare(router, timeout, mode, bucket_ids)
 end
 
 --
--- Waits until all future objects are ready and extracts results from it.
+-- Waits until all the future objects are ready. After it returns successfully
+-- the result of each future can be read without blocking.
 --
 local function router_map_callrw_ref_wait(futures, timeout)
-    local results = {}
     local deadline = fiber_clock() + timeout
     for id, future in pairs(futures) do
         timeout = deadline - fiber_clock()
@@ -823,13 +823,11 @@ local function router_map_callrw_ref_wait(futures, timeout)
             return nil, err, id
         end
         -- Ref returns nil,err or bucket count.
-        res, err = res[1], res[2]
-        if res == nil then
-            return nil, err, id
+        if res[1] == nil then
+            return nil, res[2], id
         end
-        results[id] = res
     end
-    return timeout, nil, nil, results
+    return timeout, nil, nil
 end
 
 --
@@ -840,9 +838,6 @@ end
 local function router_map_callrw_process_moved(router, results)
     local bucket_ids = {}
     for _, res in pairs(results) do
-        if type(res) ~= 'table' then
-            goto continue
-        end
         local moved = res.moved or {}
         for _, bucket in pairs(moved) do
             local bid = bucket.id
@@ -856,7 +851,6 @@ local function router_map_callrw_process_moved(router, results)
             end
             table.insert(bucket_ids, bid)
         end
-        ::continue::
     end
     return bucket_ids
 end
@@ -868,7 +862,7 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     local mode = MAP_CALLRW_FULL
     local bucket_count = 0
     local err, err_id, grouped_buckets, results
-    local futures = {}
+    local future_buckets, future_all, futures = {}, {}, {}
     local replicasets_all = router.replicasets
     local opts_async = {is_async = true}
     local args_ref
@@ -878,16 +872,18 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
     if not timeout then
         goto fail
     end
-    futures = {}
     args_ref = {nil, rid, timeout}
     for rs_id in pairs(replicasets_all) do
         local buckets = grouped_buckets[rs_id]
+        local target
         if buckets then
             args_ref[1] = 'storage_ref_make_with_buckets'
             args_ref[4] = buckets
+            target = future_buckets
         else
             args_ref[1] = 'storage_ref'
             args_ref[4] = nil
+            target = future_all
         end
         local res, ref_err = replicasets_all[rs_id]:callrw(
             'vshard.storage._call', args_ref, opts_async)
@@ -895,15 +891,22 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
             err, err_id = ref_err, rs_id
             goto fail
         end
+        target[rs_id] = res
         futures[rs_id] = res
     end
-    timeout, err, err_id, results = router_map_callrw_ref_wait(
-        futures, timeout)
+    timeout, err, err_id = router_map_callrw_ref_wait(futures, timeout)
     if not timeout then
         goto fail
     end
+    results = {}
+    for rs_id, future in pairs(future_buckets) do
+        results[rs_id] = future:result()[1]
+    end
+    for rs_id, future in pairs(future_all) do
+        results[rs_id] = {total = future:result()[1]}
+    end
     for _, res in pairs(results) do
-        if type(res) == 'table' and not res.total then
+        if not res.total then
             -- This error throws only in case when the updated router tries
             -- to calculate the total amount of buckets . On old storage
             -- versions the function storage_ref_make_with_buckets doesn't
@@ -912,10 +915,9 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
                                 'full map_callrw with split args')
             goto fail
         end
-        local rs_total = type(res) == 'table' and res.total or res
-        bucket_count = bucket_count + rs_total
+        bucket_count = bucket_count + res.total
     end
-    bucket_ids = router_map_callrw_process_moved(router, {}, results)
+    bucket_ids = router_map_callrw_process_moved(router, results)
     if next(bucket_ids) then
         args_ref = {'storage_ref_check_existing', rid, bucket_ids}
         futures = {}
@@ -928,13 +930,12 @@ local function router_ref_storage_all(router, bucket_ids, timeout, rid)
             end
             futures[rs_id] = res
         end
-        timeout, err, err_id, results = router_map_callrw_ref_wait(
-            futures, timeout)
+        timeout, err, err_id = router_map_callrw_ref_wait(futures, timeout)
         if not timeout then
             goto fail
         end
-        for rs_id, res in pairs(results) do
-            for _, bucket_id in pairs(res) do
+        for rs_id, future in pairs(futures) do
+            for _, bucket_id in pairs(future:result()[1]) do
                 bucket_reset(router, bucket_id)
                 bucket_set(router, bucket_id, rs_id)
             end
@@ -1001,11 +1002,14 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid)
             end
             futures[rs_id] = res
         end
-        timeout, err, err_id, results = router_map_callrw_ref_wait(
-            futures, timeout)
+        timeout, err, err_id = router_map_callrw_ref_wait(futures, timeout)
         if not timeout then
             goto fail
         end
+        results = {}
+        for rs_id, future in pairs(futures) do
+            results[rs_id] = future:result()[1]
+        end
         bucket_ids = router_map_callrw_process_moved(router, results)
         for rs_id, res in pairs(results) do
             if res.is_done then

There is only one place, where we might get either total bucket count or a table. And it can be reworked in a way which allows to drop this type(res) from everywhere. The idea is to unify the result for all calls, so even results of storage_ref are actually tables but having moved = nil.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch from 74f7492 to fc4a323 Compare June 15, 2026 09:53
@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch from fc4a323 to 39b8c53 Compare June 15, 2026 10:52
mrForza added 5 commits June 15, 2026 15:49
Before this patch the main `map_callrw` ref functions such as
`router_ref_storage_all` and `router_ref_storage_by_buckets` were
enormous (71 and 108 lines of code). Also these functions have a large
number of similar functional code blocks such as "sending refs",
"collecting refs" e.t.c. Since in tarantool#559 patch we will
extend the logic of full map_callrw making it able to work with split
args, the `router_ref_storage_all` can double in size. It can lead to
degradation of our codebase due to less readability.

To fix it we firstly determine general and repeated code blocks in ref
functions:
1) `ref-prepare`: groups buckets by replicasets with router's cache,
   builds a table of "target" replicasets and waits necessary masters.
2) `ref-send`: sends refs to the remote storage asynchronously and builds
   a table of future objects for the next processing.
3) `ref-collect`: waits until future objects are ready in order to
   extract payload from it (responses of storages' functions).
4) `ref-process`: a custom logic for `full` or `partial` map_callrw
   modes which describes how we should process results from future
   objects.

After defining the main stages of ref map_callrw functions we should
unify them so that we can use them in both `router_ref_storage_all` and
`router_ref_storage_by_buckets`.

Needed for tarantool#559

NO_TEST=refactoring
NO_DOC=refactoring
In this patch we change `allstatus.GARBAGE/SENT` on `BACTIVE/BSENT` to
not repeat the code.

Needed for tarantool#559

NO_DOC=refactoring
NO_TEST=refactoring
In this patch we change `status.SENDING/SENT/GARBAGE/RECEIVING` on
`BSENDING/BSENT/BGARBAGE/BRECEIVING` to not complicate the code in
`bucket_are_all_rw_not_cache`.

Needed for tarantool#559

NO_DOC=refactoring
NO_TEST=refactoring
This patch takes initialization of `rid` out to `router_map_callrw` and
passes this variable to ref-functions. It is needed for future features
tidiness, for example - `make full map_callrw with split args` in which
the logic of `router_map_callrw` becomes more complex.

Needed for tarantool#559

NO_DOC=refactoring
NO_TEST=refactoring
Before this patch the `router-luatest/reload_test` checked router's
services only with old routers. However in future patch (tarantoolgh-214) we
need to check map_callrw with old storages.

In order to make it able we:
1) change `vtest.cluster_new` so that we can pass server_config with
   certain ENV (LUA_PATH) variable into it. It can help us to create
   a new cluster on old version of vshard.
2) change `reload_router` to more general `reload_server` in order to
   unify the process of servers (router / storage) upgrade in
   `router-luatest/reload_test`.
3) unify the process of cluster creation -
   `create_cluster_on_specific_version` and the process of getting
   server's config with new ENV (LUA_PATH) variable -
   `get_config_for_specific_vshard_version`.

Needed for tarantool#214

NO_DOC=refactoring
NO_TEST=refactoring
@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch from 39b8c53 to cc99952 Compare June 15, 2026 15:59
This patch introduces a new way of `map_callrw` execution by which we can
pass some arguments to all storages and split buckets' arguments to those
storages that have at least one bucket of `bucket_ids`. To achieve this we
introduce a new string option - `mode` to `map_callrw` api.

Also we change the logic of `router_ref_storage_all` ref function. Firstly
we ref all storages and get back an amount of "moved" buckets according
to the previously built router's cache. Then if there are no "moved"
buckets we accumulate and check total amount of buckets on all storages
and finish map_callrw ref stage. Otherwise, if there are some "moved"
buckets we perform the second network hop by checking on which replicasets
do the remaining "moved" buckets reside on.

Closes tarantool#559

@TarantoolBot document

Title: vshard: `mode` option for `router.map_callrw()`

This string option regulates on which storages the user function will be
executed via `map_callrw`. Possible values:
1) mode = 'partial'. In this mode user function will be executed on
   storages that have at least one bucket of 'bucket_ids'. The
   'bucket_ids' option can be presented in two ways: like a numeric array
   of buckets' ids or like a map of buckets' arguments. In first one user
   function will only receive args, in second one it will additionally
   receive buckets' arguments.
2) mode = 'full'. In this mode user function will be executed with args on
   all storages in cluster. Also we can pass 'bucket_ids' only like a map
   of bucket's arguments - `bucket_ids = {id = {args}, id = {args}, ...}`.
   In this way the user function will additionally receive buckets'
   arguments on those storages that have at least one bucket of
   `bucket_ids`.

If we didn't specify the 'mode' option, then it is set based on
'bucket_ids' option for backward compatibility. If 'bucket_ids' is
presented, the mode will be 'partial' otherwise 'full'.

Also now `map_callrw` ends with error in cases when we
1) pass `mode = 'full'` and a numeric array of buckets' ids -
   `bucket_ids = {id, id, ...}`
2) pass `mode = 'partial'` and `bucket_ids = nil`.
@mrForza mrForza force-pushed the gh-559-full-map-call-rw-with-split-args branch from cc99952 to fe04552 Compare June 15, 2026 16:14
@mrForza mrForza requested review from Gerold103 and Serpentian June 15, 2026 16:33
@mrForza mrForza assigned Gerold103 and Serpentian and unassigned Gerold103 Jun 15, 2026

@Serpentian Serpentian left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's clean the last commit and I strongly recommend reusing the existing masters_map_call where possible. Looks good in general

Comment thread vshard/router/init.lua

return replicaset_buckets
timeout = deadline - fiber_clock()
return replicaset_buckets, nil, timeout

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer changing it in all places (before your patches there were only two these strange functions). This is probably the heritage from picodata and it looks very strange to return nil error in the middle of the arguments in case of success

Comment thread vshard/router/init.lua Outdated
local replicasets_all = router.replicasets
local deadline = fiber_clock() + timeout
if bucket_ids then
bucket_ids = bucket_ids or {}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this line is not needed, you've already checked the bucket_ids

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua
--
-- Waits until all future objects are ready and extracts results from it.
--
local function router_map_callrw_ref_wait(futures, timeout)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this differs from the one, we have already in the replicaset module. It's named masters_map_call. E.g. router_ref_storage_all can be almost fully replaced with it, just map_call over masters with storage_ref function. For router_ref_storage_by_buckets we'll need to extend its API: e.g. make func a function, which will return the name of the func to call based on the replicaset.

What I'm trying to say, we already have it implemented. And if we wanna refactor all of that, let's reuse the existing code and not duplicate it one more time.

The only function, which makes sense moving out to the separate is router_map_callrw_process_moved, since (If I remeber correctly) you use it in the upcoming commits. But it should be done, when it's needed. It's very non-obvious in that commit, why it's moved to the seprate function, since it's used once.

WDYT?

Comment thread vshard/storage/init.lua

local function bucket_are_all_rw_not_cache()
local status_index = box.space._bucket.index.status
local status = consts.BUCKET

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

t.assert(is_success)
end

local function create_cluster_on_specific_version(g, hash)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The reload_test looks messy in the last commit. You've introduced the function create_cluster_on_specific_version one commit before, why do we move it here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua Outdated
local replicasets_all = router.replicasets
local deadline = fiber_clock() + timeout
if bucket_ids then
bucket_ids = bucket_ids or {}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not

Comment thread vshard/router/init.lua Outdated
end
results[id] = res
end
return timeout, nil, nil, results

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in the first commit too, I suppose

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it should be in the last commit because we get rid of results table in the last commit

Comment thread vshard/router/init.lua
local buckets = replicasets[rs_id]
local res, ref_err = rs:callrw('vshard.storage._call',
{'storage_ref', rid, timeout}, opts_async)
args_ref = {nil, rid, timeout}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if you don't mind, let's please describe the algorithm of the new mode in the commit msg

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have already described an algorithm of the new map_callrw mode in the commit message.

Comment thread vshard/storage/init.lua
box.space._bucket:get{bucket_id}
if bucket and bucket.status ~= BGARBAGE and bucket.status ~= BSENT then
table.insert(res, bucket_id)
end

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's yield on 'BUCKET_CHUNK_SIZE, see bucket_test_gc` e.g.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua
args_ref = {nil, rid, timeout}
for rs_id in pairs(replicasets) do
local buckets = grouped_buckets[rs_id]
local target

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target looks unused

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment thread vshard/router/init.lua
local buckets = grouped_buckets[rs_id]
local target
if buckets then
args_ref[1] = 'storage_ref_make_with_buckets'

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd propose to specify the whole arg table. It's very difficult to parse

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Full map_callrw with split args

4 participants