1: -module(cets_disco_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: 
    4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]).
    5: -include_lib("common_test/include/ct.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: 
    8: 
    9: %%--------------------------------------------------------------------
   10: %% Suite configuration
   11: %%--------------------------------------------------------------------
   12: 
   13: all() ->
   14:     [{group, file}, {group, rdbms}].
   15: 
   16: groups() ->
   17:     [{file, [], file_cases()},
   18:      {rdbms, [], rdbms_cases()}].
   19: 
   20: file_cases() ->
   21:     [file_backend].
   22: 
   23: rdbms_cases() ->
   24:     [rdbms_backend,
   25:      rdbms_backend_supports_auto_cleaning,
   26:      rdbms_backend_node_doesnt_remove_itself,
   27:      rdbms_backend_db_queries].
   28: 
   29: suite() ->
   30:     distributed_helper:require_rpc_nodes([mim, mim2]).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Init & teardown
   34: %%--------------------------------------------------------------------
   35: 
   36: init_per_group(rdbms, Config) ->
   37:     case not ct_helper:is_ct_running()
   38:          orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
   39:         false -> {skip, rdbms_or_ct_not_running};
   40:         true -> Config
   41:     end;
   42: init_per_group(_, Config) ->
   43:     Config.
   44: 
   45: end_per_group(_, Config) ->
   46:     Config.
   47: 
   48: init_per_testcase(_CaseName, Config) -> Config.
   49: 
   50: end_per_testcase(_CaseName, Config) ->
   51:     unmock(mim()),
   52:     unmock(mim2()).
   53: 
   54: %%--------------------------------------------------------------------
   55: %% Test cases
   56: %%--------------------------------------------------------------------
   57: 
   58: file_backend(Config) ->
   59:     Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"),
   60:     Opts = #{disco_file => Path},
   61:     State = rpc(mim(), cets_discovery_file, init, [Opts]),
   62:     {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]),
   63:     ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)).
   64: 
   65: rdbms_backend(_Config) ->
   66:     CN = random_cluster_name(?FUNCTION_NAME),
   67:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
   68:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
   69: 
   70:     State1 = disco_init(mim(), Opts1),
   71:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
   72:     ?assertMatch(#{last_query_info := #{already_registered := false}}, State1_2),
   73:     ?assertEqual([], Nodes1_2),
   74: 
   75:     %% "test2" node can see "test1" on initial registration
   76:     State2 = disco_init(mim2(), Opts2),
   77:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
   78:     ?assertMatch(#{last_query_info := #{already_registered := false}}, State2_2),
   79:     ?assertEqual([test1], Nodes2_2),
   80: 
   81:     %% "test2" node can see "test1" on update
   82:     {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
   83:     ?assertEqual(lists:sort([test1, test2]), lists:sort(Nodes2_3)),
   84:     ?assertMatch(#{last_query_info := #{already_registered := true}}, State2_3).
   85: 
   86: rdbms_backend_supports_auto_cleaning(_Config) ->
   87:     Timestamp = month_ago(),
   88:     mock_timestamp(mim(), Timestamp),
   89:     CN = random_cluster_name(?FUNCTION_NAME),
   90:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
   91:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
   92: 
   93:     %% test1 row is written with an old (mocked) timestamp
   94:     State1 = disco_init(mim(), Opts1),
   95:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
   96:     {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
   97:     ?assertEqual([], Nodes1_2),
   98:     ?assertEqual([test1], Nodes1_3),
   99:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2),
  100:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3),
  101: 
  102:     %% test2 would clean test1 registration
  103:     %% We don't mock on mim2 node, so timestamps would differ
  104:     State2 = disco_init(mim2(), Opts2),
  105:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  106:     ?assertEqual([], Nodes2_2),
  107:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}},
  108:                  State2_2),
  109:     {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2),
  110:     ?assertEqual([test2], Nodes2_3),
  111:     #{last_query_info := #{last_rows := SelectedRows}} = State2_3,
  112:     ?assertMatch(1, length(SelectedRows)).
  113: 
  114: rdbms_backend_node_doesnt_remove_itself(_Config) ->
  115:     Timestamp = month_ago(),
  116:     mock_timestamp(mim(), Timestamp),
  117:     CN = random_cluster_name(?FUNCTION_NAME),
  118:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  119:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  120: 
  121:     %% test1 row is written with an old (mocked) timestamp
  122:     State1 = disco_init(mim(), Opts1),
  123:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  124:     ?assertEqual([], Nodes1_2),
  125:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2),
  126: 
  127:     unmock_timestamp(mim()),
  128:     %% test1 row is not removed and timestamp is updated
  129:     {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  130:     ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3),
  131:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}},
  132:                  State1_3),
  133:     ?assertEqual([test1], Nodes1_3),
  134: 
  135:     State2 = disco_init(mim2(), Opts2),
  136:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  137:     ?assertEqual([test1], Nodes2_2),
  138:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}},
  139:                  State2_2).
  140: 
  141: rdbms_backend_db_queries(_Config) ->
  142:     CN = random_cluster_name(?FUNCTION_NAME),
  143:     TS = rpc(mim(), mongoose_rdbms_timestamp, select, []),
  144:     TS2 = TS + 100,
  145: 
  146:     %% insertion fails if node name or node num is already added for the cluster
  147:     ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, TS, 1)),
  148:     ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 1)),
  149:     ?assertMatch({error, _}, insert_new(CN, <<"test1">>, TS, 2)),
  150:     ?assertMatch({error, _}, insert_new(CN, <<"test2">>, TS, 1)),
  151:     ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, TS, 2)),
  152: 
  153:     %% update of the timestamp works correctly
  154:     {selected, SelectedNodes1} = select(CN),
  155:     ?assertEqual(lists:sort([{<<"test1">>, 1, TS}, {<<"test2">>, 2, TS}]),
  156:                  lists:sort(SelectedNodes1)),
  157:     ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, TS2)),
  158:     {selected, SelectedNodes2} = select(CN),
  159:     ?assertEqual(lists:sort([{<<"test1">>, 1, TS2}, {<<"test2">>, 2, TS}]),
  160:                  lists:sort(SelectedNodes2)),
  161: 
  162:     %% node removal work correctly
  163:     ?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)),
  164:     ?assertEqual({selected, [{<<"test2">>, 2, TS}]}, select(CN)).
  165: 
  166: %%--------------------------------------------------------------------
  167: %% Helpers
  168: %%--------------------------------------------------------------------
  169: 
  170: disco_init(Node, Opts) ->
  171:     State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]),
  172:     log_disco_request(?FUNCTION_NAME, Node, Opts, State),
  173:     State.
  174: 
  175: disco_get_nodes(Node, State) ->
  176:     NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]),
  177:     log_disco_request(?FUNCTION_NAME, Node, State, NewState),
  178:     NewState.
  179: 
  180: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
  181:     ct:log("[0] disco_init(~p,~n" ++
  182:            "               ~p) =~n" ++
  183:            "        ~p",
  184:            [Node, Opts, State]),
  185:     erlang:put({disco, Node, CN}, 1);
  186: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) ->
  187:     N = case erlang:get({disco, Node, CN}) of
  188:             undefined -> 1;
  189:             Int when is_integer(Int) -> Int
  190:         end,
  191:     ct:log("[~p] disco_get_nodes(~p,~n" ++
  192:            "                    ~p) =~n" ++
  193:            "        ~p",
  194:            [N, Node, OldState, NewState]),
  195:     erlang:put({disco, Node, CN}, N+1).
  196: 
  197: timestamp() ->
  198:     os:system_time(second).
  199: 
  200: month_ago() ->
  201:     timestamp() - timer:hours(24 * 30) div 1000.
  202: 
  203: mock_timestamp(Node, Timestamp) ->
  204:     ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
  205:     ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
  206:     %% Ensure that we mock
  207:     Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []).
  208: 
  209: unmock_timestamp(Node) ->
  210:     ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
  211: 
  212: unmock(Node) ->
  213:     rpc(Node, meck, unload, []).
  214: 
  215: random_cluster_name(CaseName) ->
  216:     Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
  217:     <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.
  218: 
  219: insert_new(CN, BinNode, TS, NodeNum) ->
  220:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, TS, NodeNum]),
  221:     ct:log("insert_new(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, TS, NodeNum, Ret]),
  222:     Ret.
  223: 
  224: select(CN) ->
  225:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
  226:     ct:log("select(~p) = ~p", [CN, Ret]),
  227:     Ret.
  228: 
  229: update_existing(CN, BinNode, TS) ->
  230:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, TS]),
  231:     ct:log("select(~p, ~p, ~p) = ~p", [CN, BinNode, TS, Ret]),
  232:     Ret.
  233: 
  234: delete_node_from_db(CN, BinNode) ->
  235:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]),
  236:     ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]),
  237:     Ret.