1: -module(cets_disco_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: 
    4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]).
    5: -include_lib("common_test/include/ct.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: 
    8: 
    9: %%--------------------------------------------------------------------
   10: %% Suite configuration
   11: %%--------------------------------------------------------------------
   12: 
   13: all() ->
   14:     [{group, file}, {group, rdbms}].
   15: 
   16: groups() ->
   17:     [{file, [], file_cases()},
   18:      {rdbms, [], rdbms_cases()}].
   19: 
   20: file_cases() ->
   21:     [file_backend].
   22: 
   23: rdbms_cases() ->
   24:     [rdbms_backend,
   25:      rdbms_backend_supports_cluster_change,
   26:      rdbms_backend_cluster_name_contains_cets_version,
   27:      rdbms_backend_supports_auto_cleaning,
   28:      rdbms_backend_node_doesnt_remove_itself,
   29:      rdbms_backend_db_queries,
   30:      rdbms_backend_publishes_node_ip,
   31:      no_record_for_node,
   32:      no_ip_in_db,
   33:      epmd_just_returns_ip_from_db,
   34:      address_please,
   35:      address_please_returns_ip,
   36:      address_please_returns_ip_fallbacks_to_resolve_with_file_backend,
   37:      address_please_returns_ip_127_0_0_1_from_db].
   38: 
   39: suite() ->
   40:     distributed_helper:require_rpc_nodes([mim, mim2]).
   41: 
   42: %%--------------------------------------------------------------------
   43: %% Init & teardown
   44: %%--------------------------------------------------------------------
   45: 
   46: init_per_group(rdbms, Config) ->
   47:     case not ct_helper:is_ct_running()
   48:          orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
   49:         false -> {skip, rdbms_or_ct_not_running};
   50:         true ->
   51:             stop_and_delete_cets_discovery_if_running(),
   52:             Config
   53:     end;
   54: init_per_group(_, Config) ->
   55:     Config.
   56: 
   57: end_per_group(rdbms, Config) ->
   58:     restore_default_cets_discovery(),
   59:     Config;
   60: end_per_group(_, Config) ->
   61:     Config.
   62: 
   63: init_per_testcase(address_please_returns_ip, Config) ->
   64:     start_cets_discovery(Config);
   65: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) ->
   66:     start_cets_discovery_with_file_backnend(Config);
   67: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) ->
   68:     start_cets_discovery_with_real_ips(Config);
   69: init_per_testcase(_CaseName, Config) -> Config.
   70: 
   71: end_per_testcase(Name, Config) when Name == address_please_returns_ip;
   72:                                     Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend;
   73:                                     Name == address_please_returns_ip_127_0_0_1_from_db ->
   74:     stop_cets_discovery(),
   75:     Config;
   76: end_per_testcase(_CaseName, _Config) ->
   77:     unmock(mim()),
   78:     unmock(mim2()).
   79: 
   80: %%--------------------------------------------------------------------
   81: %% Test cases
   82: %%--------------------------------------------------------------------
   83: 
   84: file_backend(Config) ->
   85:     Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"),
   86:     Opts = #{disco_file => Path},
   87:     State = rpc(mim(), cets_discovery_file, init, [Opts]),
   88:     {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]),
   89:     ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)).
   90: 
   91: rdbms_backend(_Config) ->
   92:     CN = random_cluster_name(?FUNCTION_NAME),
   93:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
   94:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
   95: 
   96:     init_and_get_nodes(mim(), Opts1, []),
   97: 
   98:     %% "test2" node can see "test1" on initial registration
   99:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
  100: 
  101:     %% "test2" node can see "test1" on update
  102:     get_nodes(mim2(), State2, [test1, test2]).
  103: 
  104: rdbms_backend_supports_cluster_change(_Config) ->
  105:     CN1 = random_cluster_name(?FUNCTION_NAME),
  106:     CN2 = <<CN1/binary, "_new">>,
  107:     Opts1 = #{cluster_name => CN1, node_name_to_insert => <<"test1">>},
  108:     Opts2 = #{cluster_name => CN1, node_name_to_insert => <<"test2">>},
  109: 
  110:     %% Nodes test1 and test2 are in CN1, and they become connected
  111:     State1 = init_and_get_nodes(mim(), Opts1, []),
  112:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
  113:     get_nodes(mim(), State1, [test1, test2]),
  114: 
  115:     %% Node test1 moves to CN2, and the nodes are disconnected
  116:     NewState1 = init_and_get_nodes(mim(), Opts1#{cluster_name := CN2}, []),
  117:     get_nodes(mim2(), State2, [test2]),
  118:     NewState1A = get_nodes(mim(), NewState1, [test1]),
  119: 
  120:     %% Node test2 moves to CN2, and the nodes are connected again
  121:     init_and_get_nodes(mim2(), Opts2#{cluster_name := CN2}, [test1]),
  122:     get_nodes(mim(), NewState1A, [test1, test2]).
  123: 
  124: rdbms_backend_cluster_name_contains_cets_version(_Config) ->
  125:     CN = random_cluster_name(?FUNCTION_NAME),
  126:     Opts = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  127:     #{cluster_name := CNWithVsn} = init_and_get_nodes(mim(), Opts, []),
  128:     [<<>>, Vsn] = binary:split(CNWithVsn, CN),
  129:     ?assertMatch({match, _}, re:run(Vsn, "-[0-9]+\\.[0-9]+")).
  130: 
  131: rdbms_backend_supports_auto_cleaning(_Config) ->
  132:     Timestamp = month_ago(),
  133:     mock_timestamp(mim(), Timestamp),
  134:     CN = random_cluster_name(?FUNCTION_NAME),
  135:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  136:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  137: 
  138:     %% test1 row is written with an old (mocked) timestamp
  139:     State1 = init_and_get_nodes(mim(), Opts1, []),
  140:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1),
  141:     State1A = get_nodes(mim(), State1, [test1]),
  142:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A),
  143: 
  144:     %% test2 would clean test1 registration
  145:     %% We don't mock on mim2 node, so timestamps would differ
  146:     State2 = init_and_get_nodes(mim2(), Opts2, []),
  147:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, State2),
  148:     State2A = get_nodes(mim2(), State2, [test2]),
  149:     #{last_query_info := #{last_rows := SelectedRows}} = State2A,
  150:     ?assertMatch(1, length(SelectedRows)).
  151: 
  152: rdbms_backend_node_doesnt_remove_itself(_Config) ->
  153:     Timestamp = month_ago(),
  154:     mock_timestamp(mim(), Timestamp),
  155:     CN = random_cluster_name(?FUNCTION_NAME),
  156:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  157:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  158: 
  159:     %% test1 row is written with an old (mocked) timestamp
  160:     State1 = init_and_get_nodes(mim(), Opts1, []),
  161:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1),
  162: 
  163:     unmock_timestamp(mim()),
  164:     %% test1 row is not removed and timestamp is updated
  165:     State1A = get_nodes(mim(), State1, [test1]),
  166:     ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A),
  167:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State1A),
  168: 
  169:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
  170:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State2).
  171: 
  172: rdbms_backend_db_queries(_Config) ->
  173:     CN = random_cluster_name(?FUNCTION_NAME),
  174:     TS = rpc(mim(), mongoose_rdbms_timestamp, select, []),
  175:     TS2 = TS + 100,
  176: 
  177:     %% insertion fails if node name or node num is already added for the cluster
  178:     ?assertEqual({updated, 1}, insert_new(CN, <<"testA">>, 1, <<>>, TS)),
  179:     ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 1, <<>>, TS)),
  180:     ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 2, <<>>, TS)),
  181:     ?assertMatch({error, _}, insert_new(CN, <<"testB">>, 1, <<>>, TS)),
  182:     ?assertEqual({updated, 1}, insert_new(CN, <<"testB">>, 2, <<>>, TS)),
  183: 
  184:     %% insertion fails if node is a member of another cluster
  185:     ?assertMatch({error, _}, insert_new(<<"my-cluster">>, <<"testA">>, 1, <<>>, TS)),
  186: 
  187:     %% update of the timestamp works correctly
  188:     {selected, SelectedNodes1} = select(CN),
  189:     ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS}, {<<"testB">>, 2, <<>>, TS}]),
  190:                  lists:sort(SelectedNodes1)),
  191:     ?assertEqual({updated, 1}, update_existing(<<"testA">>, <<>>, TS2)),
  192:     {selected, SelectedNodes2} = select(CN),
  193:     ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS2}, {<<"testB">>, 2, <<>>, TS}]),
  194:                  lists:sort(SelectedNodes2)),
  195: 
  196:     %% node removal works correctly
  197:     ?assertEqual({updated, 1}, delete_node_from_db(<<"testA">>)),
  198:     ?assertEqual({selected, [{<<"testB">>, 2, <<>>, TS}]}, select(CN)).
  199: 
  200: rdbms_backend_publishes_node_ip(_Config) ->
  201:     %% get_pairs would return only real available nodes, so use the real node names
  202:     Node1b = atom_to_binary(maps:get(node, mim())),
  203:     Node2b = atom_to_binary(maps:get(node, mim2())),
  204:     CN = random_cluster_name(?FUNCTION_NAME),
  205:     Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b,
  206:               node_ip_binary => <<"127.0.0.1">>},
  207:     Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b,
  208:               node_ip_binary => <<"127.0.0.1">>},
  209:     State1 = disco_init(mim(), Opts1),
  210:     State2 = disco_init(mim2(), Opts2),
  211:     {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  212:     {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  213:     {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  214:     {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
  215:     {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b),
  216:     {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b).
  217: 
  218: no_record_for_node(_Config) ->
  219:     Node = <<"mongoose@badhost">>,
  220:     BackState = #{address_pairs => #{}},
  221:     {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node),
  222:     ok.
  223: 
  224: no_ip_in_db(_Config) ->
  225:     Node = <<"mongoose@noiphost">>,
  226:     BackState = #{address_pairs => #{Node => <<>>}},
  227:     {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node),
  228:     ok.
  229: 
  230: epmd_just_returns_ip_from_db(_Config) ->
  231:     Node = <<"mongoose@noepmdhost">>,
  232:     %% IP from a test range
  233:     BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}},
  234:     {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node).
  235: 
  236: address_please(_Config) ->
  237:     {error, nxdomain} =
  238:         rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]).
  239: 
  240: address_please_returns_ip(Config) ->
  241:     Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
  242:     Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
  243:     ct:log("system_info ~p", [Info]),
  244:     {ok, {192, 168, 115, 112}} = Res.
  245: 
  246: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) ->
  247:     Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]),
  248:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  249:     ct:log("system_info ~p", [Info]),
  250:     {ok, {127, 0, 0, 1}} = Res.
  251: 
  252: address_please_returns_ip_127_0_0_1_from_db(Config) ->
  253:     Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]),
  254:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  255:     ct:log("system_info ~p", [Info]),
  256:     {ok, {127, 0, 0, 1}} = Res.
  257: 
  258: %%--------------------------------------------------------------------
  259: %% Helpers
  260: %%--------------------------------------------------------------------
  261: 
  262: init_and_get_nodes(RPCNode, Opts, ExpectedNodes) ->
  263:     StateIn = disco_init(RPCNode, Opts),
  264:     get_nodes(RPCNode, StateIn, ExpectedNodes, false).
  265: 
  266: get_nodes(RPCNode, StateIn, ExpectedNodes) ->
  267:     get_nodes(RPCNode, StateIn, ExpectedNodes, true).
  268: 
  269: get_nodes(RPCNode, StateIn, ExpectedNodes, AlreadyRegistered) ->
  270:     {{ok, Nodes}, State} = disco_get_nodes(RPCNode, StateIn),
  271:     ?assertEqual(lists:sort(ExpectedNodes), lists:sort(Nodes)),
  272:     ?assertMatch(#{last_query_info := #{already_registered := AlreadyRegistered}}, State),
  273:     State.
  274: 
  275: disco_init(Node, Opts) ->
  276:     State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]),
  277:     log_disco_request(?FUNCTION_NAME, Node, Opts, State),
  278:     State.
  279: 
  280: disco_get_nodes(Node, State) ->
  281:     NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]),
  282:     log_disco_request(?FUNCTION_NAME, Node, State, NewState),
  283:     NewState.
  284: 
  285: match_node_name(Node, SysInfo, NodeToLookup) ->
  286:     rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]).
  287: 
  288: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
  289:     ct:log("[0] disco_init(~p,~n" ++
  290:            "               ~p) =~n" ++
  291:            "        ~p",
  292:            [Node, Opts, State]),
  293:     erlang:put({disco, Node, CN}, 1);
  294: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) ->
  295:     N = case erlang:get({disco, Node, CN}) of
  296:             undefined -> 1;
  297:             Int when is_integer(Int) -> Int
  298:         end,
  299:     ct:log("[~p] disco_get_nodes(~p,~n" ++
  300:            "                    ~p) =~n" ++
  301:            "        ~p",
  302:            [N, Node, OldState, NewState]),
  303:     erlang:put({disco, Node, CN}, N+1).
  304: 
  305: timestamp() ->
  306:     os:system_time(second).
  307: 
  308: month_ago() ->
  309:     timestamp() - timer:hours(24 * 30) div 1000.
  310: 
  311: mock_timestamp(Node, Timestamp) ->
  312:     ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
  313:     ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
  314:     %% Ensure that we mock
  315:     Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []).
  316: 
  317: unmock_timestamp(Node) ->
  318:     ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
  319: 
  320: unmock(Node) ->
  321:     rpc(Node, meck, unload, []).
  322: 
  323: random_cluster_name(CaseName) ->
  324:     Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
  325:     <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.
  326: 
  327: insert_new(CN, BinNode, NodeNum, Address, TS) ->
  328:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]),
  329:     ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]),
  330:     Ret.
  331: 
  332: select(CN) ->
  333:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
  334:     ct:log("select(~p) = ~p", [CN, Ret]),
  335:     Ret.
  336: 
  337: update_existing(BinNode, Address, TS) ->
  338:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [BinNode, Address, TS]),
  339:     ct:log("select(~p, ~p, ~p) = ~p", [BinNode, Address, TS, Ret]),
  340:     Ret.
  341: 
  342: delete_node_from_db(BinNode) ->
  343:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [BinNode]),
  344:     ct:log("delete_node_from_db(~p) = ~p", [BinNode, Ret]),
  345:     Ret.
  346: 
  347: start_cets_discovery(Config) ->
  348:     start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)),
  349:     start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)),
  350:     force_nodes_to_see_each_other(mim(), mim2()),
  351:     Config.
  352: 
  353: start_cets_discovery_with_real_ips(Config) ->
  354:     start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)),
  355:     start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)),
  356:     force_nodes_to_see_each_other(mim(), mim2()),
  357:     Config.
  358: 
  359: start_cets_discovery_with_file_backnend(Config) ->
  360:     start_disco(mim(), cets_disco_spec_for_file_backend()),
  361:     start_disco(mim2(), cets_disco_spec_for_file_backend()),
  362:     Config.
  363: 
  364: stop_cets_discovery() ->
  365:     ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
  366:     ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).
  367: 
  368: stop_and_delete_cets_discovery() ->
  369:     stop_cets_discovery(),
  370:     ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]),
  371:     ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]).
  372: 
  373: stop_and_delete_cets_discovery_if_running() ->
  374:     case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of
  375:         undefined ->
  376:             ok;
  377:         _ ->
  378:             stop_and_delete_cets_discovery()
  379:     end.
  380: 
  381: restore_default_cets_discovery() ->
  382:     restore_default_cets_discovery(mim()),
  383:     restore_default_cets_discovery(mim2()).
  384: 
  385: restore_default_cets_discovery(Node) ->
  386:     case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of
  387:         [] ->
  388:             ok;
  389:         [Spec] ->
  390:             start_disco(Node, Spec)
  391:     end.
  392: 
  393: cets_disco_spec(Node, IP) ->
  394:     DiscoOpts = #{
  395:         backend_module => mongoose_cets_discovery_rdbms,
  396:         cluster_name => <<"mim">>,
  397:         node_name_to_insert => Node,
  398:         node_ip_binary => IP,
  399:         name => mongoose_cets_discovery},
  400:      cets_disco_spec(DiscoOpts).
  401: 
  402: cets_disco_spec_for_file_backend() ->
  403:     DiscoOpts = #{
  404:         backend_module => cets_discovery_file,
  405:         disco_file => "/tmp/does_not_exist",
  406:         name => mongoose_cets_discovery},
  407:      cets_disco_spec(DiscoOpts).
  408: 
  409: cets_disco_spec(DiscoOpts) ->
  410:      #{
  411:         id => cets_discovery,
  412:         start => {mongoose_cets_discovery, start_link, [DiscoOpts]},
  413:         restart => temporary,
  414:         type => worker,
  415:         shutdown => infinity,
  416:         modules => [cets_discovery]}.
  417: 
  418: send_check(Node) ->
  419:     rpc(Node, erlang, send, [mongoose_cets_discovery, check]).
  420: 
  421: wait_for_get_nodes(Node) ->
  422:     ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]).
  423: 
  424: start_disco(Node, Spec) ->
  425:     {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]).
  426: 
  427: force_nodes_to_see_each_other(Node1, Node2) ->
  428:     send_check(Node2),
  429:     wait_for_get_nodes(Node2),
  430:     send_check(Node1),
  431:     wait_for_get_nodes(Node1),
  432:     send_check(Node2),
  433:     wait_for_get_nodes(Node2).