1: -module(cets_disco_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: 
    4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]).
    5: -include_lib("common_test/include/ct.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: 
    8: 
    9: %%--------------------------------------------------------------------
   10: %% Suite configuration
   11: %%--------------------------------------------------------------------
   12: 
   13: all() ->
   14:     [{group, file}, {group, rdbms}].
   15: 
   16: groups() ->
   17:     [{file, [], file_cases()},
   18:      {rdbms, [], rdbms_cases()}].
   19: 
   20: file_cases() ->
   21:     [file_backend].
   22: 
   23: rdbms_cases() ->
   24:     [rdbms_backend,
   25:      rdbms_backend_supports_cluster_change,
   26:      rdbms_backend_supports_auto_cleaning,
   27:      rdbms_backend_node_doesnt_remove_itself,
   28:      rdbms_backend_db_queries,
   29:      rdbms_backend_publishes_node_ip,
   30:      no_record_for_node,
   31:      no_ip_in_db,
   32:      epmd_just_returns_ip_from_db,
   33:      address_please,
   34:      address_please_returns_ip,
   35:      address_please_returns_ip_fallbacks_to_resolve_with_file_backend,
   36:      address_please_returns_ip_127_0_0_1_from_db].
   37: 
   38: suite() ->
   39:     distributed_helper:require_rpc_nodes([mim, mim2]).
   40: 
   41: %%--------------------------------------------------------------------
   42: %% Init & teardown
   43: %%--------------------------------------------------------------------
   44: 
   45: init_per_group(rdbms, Config) ->
   46:     case not ct_helper:is_ct_running()
   47:          orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of
   48:         false -> {skip, rdbms_or_ct_not_running};
   49:         true ->
   50:             stop_and_delete_cets_discovery_if_running(),
   51:             Config
   52:     end;
   53: init_per_group(_, Config) ->
   54:     Config.
   55: 
   56: end_per_group(rdbms, Config) ->
   57:     restore_default_cets_discovery(),
   58:     Config;
   59: end_per_group(_, Config) ->
   60:     Config.
   61: 
   62: init_per_testcase(address_please_returns_ip, Config) ->
   63:     start_cets_discovery(Config);
   64: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) ->
   65:     start_cets_discovery_with_file_backnend(Config);
   66: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) ->
   67:     start_cets_discovery_with_real_ips(Config);
   68: init_per_testcase(_CaseName, Config) -> Config.
   69: 
   70: end_per_testcase(Name, Config) when Name == address_please_returns_ip;
   71:                                     Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend;
   72:                                     Name == address_please_returns_ip_127_0_0_1_from_db ->
   73:     stop_cets_discovery(),
   74:     Config;
   75: end_per_testcase(_CaseName, _Config) ->
   76:     unmock(mim()),
   77:     unmock(mim2()).
   78: 
   79: %%--------------------------------------------------------------------
   80: %% Test cases
   81: %%--------------------------------------------------------------------
   82: 
   83: file_backend(Config) ->
   84:     Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"),
   85:     Opts = #{disco_file => Path},
   86:     State = rpc(mim(), cets_discovery_file, init, [Opts]),
   87:     {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]),
   88:     ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)).
   89: 
   90: rdbms_backend(_Config) ->
   91:     CN = random_cluster_name(?FUNCTION_NAME),
   92:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
   93:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
   94: 
   95:     init_and_get_nodes(mim(), Opts1, []),
   96: 
   97:     %% "test2" node can see "test1" on initial registration
   98:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
   99: 
  100:     %% "test2" node can see "test1" on update
  101:     get_nodes(mim2(), State2, [test1, test2]).
  102: 
  103: rdbms_backend_supports_cluster_change(_Config) ->
  104:     CN1 = random_cluster_name(?FUNCTION_NAME),
  105:     CN2 = <<CN1/binary, "_new">>,
  106:     Opts1 = #{cluster_name => CN1, node_name_to_insert => <<"test1">>},
  107:     Opts2 = #{cluster_name => CN1, node_name_to_insert => <<"test2">>},
  108: 
  109:     %% Nodes test1 and test2 are in CN1, and they become connected
  110:     State1 = init_and_get_nodes(mim(), Opts1, []),
  111:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
  112:     get_nodes(mim(), State1, [test1, test2]),
  113: 
  114:     %% Node test1 moves to CN2, and the nodes are disconnected
  115:     NewState1 = init_and_get_nodes(mim(), Opts1#{cluster_name := CN2}, []),
  116:     get_nodes(mim2(), State2, [test2]),
  117:     NewState1A = get_nodes(mim(), NewState1, [test1]),
  118: 
  119:     %% Node test2 moves to CN2, and the nodes are connected again
  120:     init_and_get_nodes(mim2(), Opts2#{cluster_name := CN2}, [test1]),
  121:     get_nodes(mim(), NewState1A, [test1, test2]).
  122: 
  123: rdbms_backend_supports_auto_cleaning(_Config) ->
  124:     Timestamp = month_ago(),
  125:     mock_timestamp(mim(), Timestamp),
  126:     CN = random_cluster_name(?FUNCTION_NAME),
  127:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  128:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  129: 
  130:     %% test1 row is written with an old (mocked) timestamp
  131:     State1 = init_and_get_nodes(mim(), Opts1, []),
  132:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1),
  133:     State1A = get_nodes(mim(), State1, [test1]),
  134:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A),
  135: 
  136:     %% test2 would clean test1 registration
  137:     %% We don't mock on mim2 node, so timestamps would differ
  138:     State2 = init_and_get_nodes(mim2(), Opts2, []),
  139:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, State2),
  140:     State2A = get_nodes(mim2(), State2, [test2]),
  141:     #{last_query_info := #{last_rows := SelectedRows}} = State2A,
  142:     ?assertMatch(1, length(SelectedRows)).
  143: 
  144: rdbms_backend_node_doesnt_remove_itself(_Config) ->
  145:     Timestamp = month_ago(),
  146:     mock_timestamp(mim(), Timestamp),
  147:     CN = random_cluster_name(?FUNCTION_NAME),
  148:     Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>},
  149:     Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>},
  150: 
  151:     %% test1 row is written with an old (mocked) timestamp
  152:     State1 = init_and_get_nodes(mim(), Opts1, []),
  153:     ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1),
  154: 
  155:     unmock_timestamp(mim()),
  156:     %% test1 row is not removed and timestamp is updated
  157:     State1A = get_nodes(mim(), State1, [test1]),
  158:     ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A),
  159:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State1A),
  160: 
  161:     State2 = init_and_get_nodes(mim2(), Opts2, [test1]),
  162:     ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State2).
  163: 
  164: rdbms_backend_db_queries(_Config) ->
  165:     CN = random_cluster_name(?FUNCTION_NAME),
  166:     TS = rpc(mim(), mongoose_rdbms_timestamp, select, []),
  167:     TS2 = TS + 100,
  168: 
  169:     %% insertion fails if node name or node num is already added for the cluster
  170:     ?assertEqual({updated, 1}, insert_new(CN, <<"testA">>, 1, <<>>, TS)),
  171:     ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 1, <<>>, TS)),
  172:     ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 2, <<>>, TS)),
  173:     ?assertMatch({error, _}, insert_new(CN, <<"testB">>, 1, <<>>, TS)),
  174:     ?assertEqual({updated, 1}, insert_new(CN, <<"testB">>, 2, <<>>, TS)),
  175: 
  176:     %% insertion fails if node is a member of another cluster
  177:     ?assertMatch({error, _}, insert_new(<<"my-cluster">>, <<"testA">>, 1, <<>>, TS)),
  178: 
  179:     %% update of the timestamp works correctly
  180:     {selected, SelectedNodes1} = select(CN),
  181:     ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS}, {<<"testB">>, 2, <<>>, TS}]),
  182:                  lists:sort(SelectedNodes1)),
  183:     ?assertEqual({updated, 1}, update_existing(<<"testA">>, <<>>, TS2)),
  184:     {selected, SelectedNodes2} = select(CN),
  185:     ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS2}, {<<"testB">>, 2, <<>>, TS}]),
  186:                  lists:sort(SelectedNodes2)),
  187: 
  188:     %% node removal works correctly
  189:     ?assertEqual({updated, 1}, delete_node_from_db(<<"testA">>)),
  190:     ?assertEqual({selected, [{<<"testB">>, 2, <<>>, TS}]}, select(CN)).
  191: 
  192: rdbms_backend_publishes_node_ip(_Config) ->
  193:     %% get_pairs would return only real available nodes, so use the real node names
  194:     Node1b = atom_to_binary(maps:get(node, mim())),
  195:     Node2b = atom_to_binary(maps:get(node, mim2())),
  196:     CN = random_cluster_name(?FUNCTION_NAME),
  197:     Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b,
  198:               node_ip_binary => <<"127.0.0.1">>},
  199:     Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b,
  200:               node_ip_binary => <<"127.0.0.1">>},
  201:     State1 = disco_init(mim(), Opts1),
  202:     State2 = disco_init(mim2(), Opts2),
  203:     {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1),
  204:     {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2),
  205:     {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2),
  206:     {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2),
  207:     {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b),
  208:     {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b).
  209: 
  210: no_record_for_node(_Config) ->
  211:     Node = <<"mongoose@badhost">>,
  212:     BackState = #{address_pairs => #{}},
  213:     {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node),
  214:     ok.
  215: 
  216: no_ip_in_db(_Config) ->
  217:     Node = <<"mongoose@noiphost">>,
  218:     BackState = #{address_pairs => #{Node => <<>>}},
  219:     {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node),
  220:     ok.
  221: 
  222: epmd_just_returns_ip_from_db(_Config) ->
  223:     Node = <<"mongoose@noepmdhost">>,
  224:     %% IP from a test range
  225:     BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}},
  226:     {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node).
  227: 
  228: address_please(_Config) ->
  229:     {error, nxdomain} =
  230:         rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]).
  231: 
  232: address_please_returns_ip(Config) ->
  233:     Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]),
  234:     Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]),
  235:     ct:log("system_info ~p", [Info]),
  236:     {ok, {192, 168, 115, 112}} = Res.
  237: 
  238: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) ->
  239:     Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]),
  240:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  241:     ct:log("system_info ~p", [Info]),
  242:     {ok, {127, 0, 0, 1}} = Res.
  243: 
  244: address_please_returns_ip_127_0_0_1_from_db(Config) ->
  245:     Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]),
  246:     Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]),
  247:     ct:log("system_info ~p", [Info]),
  248:     {ok, {127, 0, 0, 1}} = Res.
  249: 
  250: %%--------------------------------------------------------------------
  251: %% Helpers
  252: %%--------------------------------------------------------------------
  253: 
  254: init_and_get_nodes(RPCNode, Opts, ExpectedNodes) ->
  255:     StateIn = disco_init(RPCNode, Opts),
  256:     get_nodes(RPCNode, StateIn, ExpectedNodes, false).
  257: 
  258: get_nodes(RPCNode, StateIn, ExpectedNodes) ->
  259:     get_nodes(RPCNode, StateIn, ExpectedNodes, true).
  260: 
  261: get_nodes(RPCNode, StateIn, ExpectedNodes, AlreadyRegistered) ->
  262:     {{ok, Nodes}, State} = disco_get_nodes(RPCNode, StateIn),
  263:     ?assertEqual(lists:sort(ExpectedNodes), lists:sort(Nodes)),
  264:     ?assertMatch(#{last_query_info := #{already_registered := AlreadyRegistered}}, State),
  265:     State.
  266: 
  267: disco_init(Node, Opts) ->
  268:     State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]),
  269:     log_disco_request(?FUNCTION_NAME, Node, Opts, State),
  270:     State.
  271: 
  272: disco_get_nodes(Node, State) ->
  273:     NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]),
  274:     log_disco_request(?FUNCTION_NAME, Node, State, NewState),
  275:     NewState.
  276: 
  277: match_node_name(Node, SysInfo, NodeToLookup) ->
  278:     rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]).
  279: 
  280: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) ->
  281:     ct:log("[0] disco_init(~p,~n" ++
  282:            "               ~p) =~n" ++
  283:            "        ~p",
  284:            [Node, Opts, State]),
  285:     erlang:put({disco, Node, CN}, 1);
  286: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) ->
  287:     N = case erlang:get({disco, Node, CN}) of
  288:             undefined -> 1;
  289:             Int when is_integer(Int) -> Int
  290:         end,
  291:     ct:log("[~p] disco_get_nodes(~p,~n" ++
  292:            "                    ~p) =~n" ++
  293:            "        ~p",
  294:            [N, Node, OldState, NewState]),
  295:     erlang:put({disco, Node, CN}, N+1).
  296: 
  297: timestamp() ->
  298:     os:system_time(second).
  299: 
  300: month_ago() ->
  301:     timestamp() - timer:hours(24 * 30) div 1000.
  302: 
  303: mock_timestamp(Node, Timestamp) ->
  304:     ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]),
  305:     ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]),
  306:     %% Ensure that we mock
  307:     Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []).
  308: 
  309: unmock_timestamp(Node) ->
  310:     ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]).
  311: 
  312: unmock(Node) ->
  313:     rpc(Node, meck, unload, []).
  314: 
  315: random_cluster_name(CaseName) ->
  316:     Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []),
  317:     <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>.
  318: 
  319: insert_new(CN, BinNode, NodeNum, Address, TS) ->
  320:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]),
  321:     ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]),
  322:     Ret.
  323: 
  324: select(CN) ->
  325:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]),
  326:     ct:log("select(~p) = ~p", [CN, Ret]),
  327:     Ret.
  328: 
  329: update_existing(BinNode, Address, TS) ->
  330:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [BinNode, Address, TS]),
  331:     ct:log("select(~p, ~p, ~p) = ~p", [BinNode, Address, TS, Ret]),
  332:     Ret.
  333: 
  334: delete_node_from_db(BinNode) ->
  335:     Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [BinNode]),
  336:     ct:log("delete_node_from_db(~p) = ~p", [BinNode, Ret]),
  337:     Ret.
  338: 
  339: start_cets_discovery(Config) ->
  340:     start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)),
  341:     start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)),
  342:     force_nodes_to_see_each_other(mim(), mim2()),
  343:     Config.
  344: 
  345: start_cets_discovery_with_real_ips(Config) ->
  346:     start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)),
  347:     start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)),
  348:     force_nodes_to_see_each_other(mim(), mim2()),
  349:     Config.
  350: 
  351: start_cets_discovery_with_file_backnend(Config) ->
  352:     start_disco(mim(), cets_disco_spec_for_file_backend()),
  353:     start_disco(mim2(), cets_disco_spec_for_file_backend()),
  354:     Config.
  355: 
  356: stop_cets_discovery() ->
  357:     ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]),
  358:     ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]).
  359: 
  360: stop_and_delete_cets_discovery() ->
  361:     stop_cets_discovery(),
  362:     ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]),
  363:     ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]).
  364: 
  365: stop_and_delete_cets_discovery_if_running() ->
  366:     case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of
  367:         undefined ->
  368:             ok;
  369:         _ ->
  370:             stop_and_delete_cets_discovery()
  371:     end.
  372: 
  373: restore_default_cets_discovery() ->
  374:     restore_default_cets_discovery(mim()),
  375:     restore_default_cets_discovery(mim2()).
  376: 
  377: restore_default_cets_discovery(Node) ->
  378:     case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of
  379:         [] ->
  380:             ok;
  381:         [Spec] ->
  382:             start_disco(Node, Spec)
  383:     end.
  384: 
  385: cets_disco_spec(Node, IP) ->
  386:     DiscoOpts = #{
  387:         backend_module => mongoose_cets_discovery_rdbms,
  388:         cluster_name => <<"mim">>,
  389:         node_name_to_insert => Node,
  390:         node_ip_binary => IP,
  391:         name => mongoose_cets_discovery},
  392:      cets_disco_spec(DiscoOpts).
  393: 
  394: cets_disco_spec_for_file_backend() ->
  395:     DiscoOpts = #{
  396:         backend_module => cets_discovery_file,
  397:         disco_file => "/tmp/does_not_exist",
  398:         name => mongoose_cets_discovery},
  399:      cets_disco_spec(DiscoOpts).
  400: 
  401: cets_disco_spec(DiscoOpts) ->
  402:      #{
  403:         id => cets_discovery,
  404:         start => {mongoose_cets_discovery, start_link, [DiscoOpts]},
  405:         restart => temporary,
  406:         type => worker,
  407:         shutdown => infinity,
  408:         modules => [cets_discovery]}.
  409: 
  410: send_check(Node) ->
  411:     rpc(Node, erlang, send, [mongoose_cets_discovery, check]).
  412: 
  413: wait_for_get_nodes(Node) ->
  414:     ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]).
  415: 
  416: start_disco(Node, Spec) ->
  417:     {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]).
  418: 
  419: force_nodes_to_see_each_other(Node1, Node2) ->
  420:     send_check(Node2),
  421:     wait_for_get_nodes(Node2),
  422:     send_check(Node1),
  423:     wait_for_get_nodes(Node1),
  424:     send_check(Node2),
  425:     wait_for_get_nodes(Node2).