1: -module(cets_disco_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: 
    4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]).
    5: -include_lib("common_test/include/ct.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: 
    8: 
    9: %%--------------------------------------------------------------------
   10: %% Suite configuration
   11: %%--------------------------------------------------------------------
   12: 
   13: all() ->
   14:     [{group, file}, {group, rdbms}].
   15: 
   16: groups() ->
   17:     [{file, [], file_cases()},
   18:      {rdbms, [], rdbms_cases()}].
   19: 
   20: file_cases() ->
   21:     [file_backend].
   22: 
   23: rdbms_cases() ->
   24:     [rdbms_backend,
   25:      rdbms_backend_supports_auto_cleaning,
   26:      rdbms_backend_node_doesnt_remove_itself,
   27:      rdbms_backend_db_queries,
   28:      rdbms_backend_publishes_node_ip,
   29:      no_record_for_node,
   30:      no_ip_in_db,
   31:      epmd_just_returns_ip_from_db,
   32:      address_please,
   33:      address_please_returns_ip,
   34:      address_please_returns_ip_fallbacks_to_resolve_with_file_backend,
   35:      address_please_returns_ip_127_0_0_1_from_db].
   36: 
   37: suite() ->
   38:     distributed_helper:require_rpc_nodes([mim, mim2]).
   39: 
   40: %%--------------------------------------------------------------------
   41: %% Init & teardown
   42: %%--------------------------------------------------------------------
   43: 
   44: init_per_group(rdbms, Config) ->
   45:     case not ct_helper:is_ct_running()
   46:          orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
   47:         false -> {skip, rdbms_or_ct_not_running};
   48:         true ->
   49:             stop_and_delete_cets_discovery_if_running(),
   50:             Config
   51:     end;
   52: init_per_group(_, Config) ->
   53:     Config.
   54: 
   55: end_per_group(rdbms, Config) ->
   56:     restore_default_cets_discovery(),
   57:     Config;
   58: end_per_group(_, Config) ->
   59:     Config.
   60: 
   61: init_per_testcase(address_please_returns_ip, Config) ->
   62:     start_cets_discovery(Config);
   63: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) ->
   64:     start_cets_discovery_with_file_backnend(Config);
   65: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) ->
   66:     start_cets_discovery_with_real_ips(Config);
   67: init_per_testcase(_CaseName, Config) -> Config.
   68: 
   69: end_per_testcase(Name, Config) when Name == address_please_returns_ip;
   70:                                     Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend;
   71:                                     Name == address_please_returns_ip_127_0_0_1_from_db ->
   72:     stop_cets_discovery(),
   73:     Config;
   74: end_per_testcase(_CaseName, Config) ->
   75:     unmock(mim()),
   76:     unmock(mim2()).
   77: 
   78: %%--------------------------------------------------------------------
   79: %% Test cases
   80: %%--------------------------------------------------------------------
   81: 
   82: file_backend(Config) ->
   83:     Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"),
   84:     Opts = #{disco_file => Path},
   85:     State = rpc(mim(), cets_discovery_file, init, [Opts]),
   86:     {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]),
   87:     ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)).
   88: 
   89: rdbms_backend(_Config) ->
   90:     CN = random_cluster_name(?FUNCTION_NAME),
   91:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
   92:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
   93: 
   94:     State1 = disco_init(mim(), Opts1),
   95:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
   96:     ?assertMatch(#{last_query_info := #{already_registered := false}}, State1_2),
   97:     ?assertEqual([], Nodes1_2),
   98: 
   99:     %% "test2" node can see "test1" on initial registration
  100:     State2 = disco_init(mim2(), Opts2),
  101:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  102:     ?assertMatch(#{last_query_info := #{already_registered := false}}, State2_2),
  103:     ?assertEqual([test1], Nodes2_2),
  104: 
  105:     %% "test2" node can see "test1" on update
  106:     {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
  107:     ?assertEqual(lists:sort([test1, test2]), lists:sort(Nodes2_3)),
  108:     ?assertMatch(#{last_query_info := #{already_registered := true}}, State2_3).
  109: 
  110: rdbms_backend_supports_auto_cleaning(_Config) ->
  111:     Timestamp = month_ago(),
  112:     mock_timestamp(mim(), Timestamp),
  113:     CN = random_cluster_name(?FUNCTION_NAME),
  114:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  115:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  116: 
  117:     %% test1 row is written with an old (mocked) timestamp
  118:     State1 = disco_init(mim(), Opts1),
  119:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  120:     {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  121:     ?assertEqual([], Nodes1_2),
  122:     ?assertEqual([test1], Nodes1_3),
  123:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2),
  124:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3),
  125: 
  126:     %% test2 would clean test1 registration
  127:     %% We don't mock on mim2 node, so timestamps would differ
  128:     State2 = disco_init(mim2(), Opts2),
  129:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  130:     ?assertEqual([], Nodes2_2),
  131:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}},
  132:                  State2_2),
  133:     {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2),
  134:     ?assertEqual([test2], Nodes2_3),
  135:     #{last_query_info := #{last_rows := SelectedRows}} = State2_3,
  136:     ?assertMatch(1, length(SelectedRows)).
  137: 
  138: rdbms_backend_node_doesnt_remove_itself(_Config) ->
  139:     Timestamp = month_ago(),
  140:     mock_timestamp(mim(), Timestamp),
  141:     CN = random_cluster_name(?FUNCTION_NAME),
  142:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  143:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  144: 
  145:     %% test1 row is written with an old (mocked) timestamp
  146:     State1 = disco_init(mim(), Opts1),
  147:     {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  148:     ?assertEqual([], Nodes1_2),
  149:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2),
  150: 
  151:     unmock_timestamp(mim()),
  152:     %% test1 row is not removed and timestamp is updated
  153:     {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  154:     ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3),
  155:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}},
  156:                  State1_3),
  157:     ?assertEqual([test1], Nodes1_3),
  158: 
  159:     State2 = disco_init(mim2(), Opts2),
  160:     {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  161:     ?assertEqual([test1], Nodes2_2),
  162:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}},
  163:                  State2_2).
  164: 
  165: rdbms_backend_db_queries(_Config) ->
  166:     CN = random_cluster_name(?FUNCTION_NAME),
  167:     TS = rpc(mim(), mongoose_rdbms_timestamp, select, []),
  168:     TS2 = TS + 100,
  169: 
  170:     %% insertion fails if node name or node num is already added for the cluster
  171:     ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
  172:     ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)),
  173:     ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)),
  174:     ?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)),
  175:     ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)),
  176: 
  177:     %% update of the timestamp works correctly
  178:     {selected, SelectedNodes1} = select(CN),
  179:     ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]),
  180:                  lists:sort(SelectedNodes1)),
  181:     ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, <<>>, TS2)),
  182:     {selected, SelectedNodes2} = select(CN),
  183:     ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]),
  184:                  lists:sort(SelectedNodes2)),
  185: 
  186:     %% node removal work correctly
  187:     ?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)),
  188:     ?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)).
  189: 
  190: rdbms_backend_publishes_node_ip(_Config) ->
  191:     %% get_pairs would return only real available nodes, so use the real node names
  192:     Node1b = atom_to_binary(maps:get(node, mim())),
  193:     Node2b = atom_to_binary(maps:get(node, mim2())),
  194:     CN = random_cluster_name(?FUNCTION_NAME),
  195:     Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b,
  196:               node_ip_binary => <<"127.0.0.1">>},
  197:     Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b,
  198:               node_ip_binary => <<"127.0.0.1">>},
  199:     State1 = disco_init(mim(), Opts1),
  200:     State2 = disco_init(mim2(), Opts2),
  201:     {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  202:     {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  203:     {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  204:     {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
  205:     {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b),
  206:     {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b).
  207: 
  208: no_record_for_node(_Config) ->
  209:     Node = <<"mongoose@badhost">>,
  210:     BackState = #{address_pairs => #{}},
  211:     {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node),
  212:     ok.
  213: 
  214: no_ip_in_db(_Config) ->
  215:     Node = <<"mongoose@noiphost">>,
  216:     BackState = #{address_pairs => #{Node => <<>>}},
  217:     {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node),
  218:     ok.
  219: 
  220: epmd_just_returns_ip_from_db(_Config) ->
  221:     Node = <<"mongoose@noepmdhost">>,
  222:     %% IP from a test range
  223:     BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}},
  224:     {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node).
  225: 
  226: address_please(_Config) ->
  227:     {error, nxdomain} =
  228:         rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]).
  229: 
  230: address_please_returns_ip(Config) ->
  231:     Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
  232:     Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
  233:     ct:log("system_info ~p", [Info]),
  234:     {ok, {192, 168, 115, 112}} = Res.
  235: 
  236: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) ->
  237:     Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]),
  238:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  239:     ct:log("system_info ~p", [Info]),
  240:     {ok, {127, 0, 0, 1}} = Res.
  241: 
  242: address_please_returns_ip_127_0_0_1_from_db(Config) ->
  243:     Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]),
  244:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  245:     ct:log("system_info ~p", [Info]),
  246:     {ok, {127, 0, 0, 1}} = Res.
  247: 
  248: %%--------------------------------------------------------------------
  249: %% Helpers
  250: %%--------------------------------------------------------------------
  251: 
  252: disco_init(Node, Opts) ->
  253:     State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]),
  254:     log_disco_request(?FUNCTION_NAME, Node, Opts, State),
  255:     State.
  256: 
  257: disco_get_nodes(Node, State) ->
  258:     NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]),
  259:     log_disco_request(?FUNCTION_NAME, Node, State, NewState),
  260:     NewState.
  261: 
  262: match_node_name(Node, SysInfo, NodeToLookup) ->
  263:     rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]).
  264: 
  265: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
  266:     ct:log("[0] disco_init(~p,~n" ++
  267:            "               ~p) =~n" ++
  268:            "        ~p",
  269:            [Node, Opts, State]),
  270:     erlang:put({disco, Node, CN}, 1);
  271: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) ->
  272:     N = case erlang:get({disco, Node, CN}) of
  273:             undefined -> 1;
  274:             Int when is_integer(Int) -> Int
  275:         end,
  276:     ct:log("[~p] disco_get_nodes(~p,~n" ++
  277:            "                    ~p) =~n" ++
  278:            "        ~p",
  279:            [N, Node, OldState, NewState]),
  280:     erlang:put({disco, Node, CN}, N+1).
  281: 
  282: timestamp() ->
  283:     os:system_time(second).
  284: 
  285: month_ago() ->
  286:     timestamp() - timer:hours(24 * 30) div 1000.
  287: 
  288: mock_timestamp(Node, Timestamp) ->
  289:     ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
  290:     ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
  291:     %% Ensure that we mock
  292:     Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []).
  293: 
  294: unmock_timestamp(Node) ->
  295:     ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
  296: 
  297: unmock(Node) ->
  298:     rpc(Node, meck, unload, []).
  299: 
  300: random_cluster_name(CaseName) ->
  301:     Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
  302:     <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.
  303: 
  304: insert_new(CN, BinNode, NodeNum, Address, TS) ->
  305:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]),
  306:     ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]),
  307:     Ret.
  308: 
  309: select(CN) ->
  310:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
  311:     ct:log("select(~p) = ~p", [CN, Ret]),
  312:     Ret.
  313: 
  314: update_existing(CN, BinNode, Address, TS) ->
  315:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, Address, TS]),
  316:     ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]),
  317:     Ret.
  318: 
  319: delete_node_from_db(CN, BinNode) ->
  320:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]),
  321:     ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]),
  322:     Ret.
  323: 
  324: start_cets_discovery(Config) ->
  325:     start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)),
  326:     start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)),
  327:     force_nodes_to_see_each_other(mim(), mim2()),
  328:     Config.
  329: 
  330: start_cets_discovery_with_real_ips(Config) ->
  331:     start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)),
  332:     start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)),
  333:     force_nodes_to_see_each_other(mim(), mim2()),
  334:     Config.
  335: 
  336: start_cets_discovery_with_file_backnend(Config) ->
  337:     start_disco(mim(), cets_disco_spec_for_file_backend()),
  338:     start_disco(mim2(), cets_disco_spec_for_file_backend()),
  339:     Config.
  340: 
  341: stop_cets_discovery() ->
  342:     ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
  343:     ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).
  344: 
  345: stop_and_delete_cets_discovery() ->
  346:     stop_cets_discovery(),
  347:     ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]),
  348:     ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]).
  349: 
  350: stop_and_delete_cets_discovery_if_running() ->
  351:     case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of
  352:         undefined ->
  353:             ok;
  354:         _ ->
  355:             stop_and_delete_cets_discovery()
  356:     end.
  357: 
  358: restore_default_cets_discovery() ->
  359:     restore_default_cets_discovery(mim()),
  360:     restore_default_cets_discovery(mim2()).
  361: 
  362: restore_default_cets_discovery(Node) ->
  363:     case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of
  364:         [] ->
  365:             ok;
  366:         [Spec] ->
  367:             start_disco(Node, Spec)
  368:     end.
  369: 
  370: cets_disco_spec(Node, IP) ->
  371:     DiscoOpts = #{
  372:         backend_module => mongoose_cets_discovery_rdbms,
  373:         cluster_name => <<"mim">>,
  374:         node_name_to_insert => Node,
  375:         node_ip_binary => IP,
  376:         name => mongoose_cets_discovery},
  377:      cets_disco_spec(DiscoOpts).
  378: 
  379: cets_disco_spec_for_file_backend() ->
  380:     DiscoOpts = #{
  381:         backend_module => cets_discovery_file,
  382:         disco_file => "/tmp/does_not_exist",
  383:         name => mongoose_cets_discovery},
  384:      cets_disco_spec(DiscoOpts).
  385: 
  386: cets_disco_spec(DiscoOpts) ->
  387:      #{
  388:         id => cets_discovery,
  389:         start => {mongoose_cets_discovery, start_link, [DiscoOpts]},
  390:         restart => temporary,
  391:         type => worker,
  392:         shutdown => infinity,
  393:         modules => [cets_discovery]}.
  394: 
  395: send_check(Node) ->
  396:     rpc(Node, erlang, send, [mongoose_cets_discovery, check]).
  397: 
  398: wait_for_get_nodes(Node) ->
  399:     ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]).
  400: 
  401: start_disco(Node, Spec) ->
  402:     {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]).
  403: 
  404: force_nodes_to_see_each_other(Node1, Node2) ->
  405:     send_check(Node2),
  406:     wait_for_get_nodes(Node2),
  407:     send_check(Node1),
  408:     wait_for_get_nodes(Node1),
  409:     send_check(Node2),
  410:     wait_for_get_nodes(Node2).