1: -module(cets_disco_SUITE). 2: -compile([export_all, nowarn_export_all]). 3: 4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]). 5: -include_lib("common_test/include/ct.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: 8: 9: %%-------------------------------------------------------------------- 10: %% Suite configuration 11: %%-------------------------------------------------------------------- 12: 13: all() -> 14: [{group, file}, {group, rdbms}]. 15: 16: groups() -> 17: [{file, [], file_cases()}, 18: {rdbms, [], rdbms_cases()}]. 19: 20: file_cases() -> 21: [file_backend]. 22: 23: rdbms_cases() -> 24: [rdbms_backend, 25: rdbms_backend_supports_auto_cleaning, 26: rdbms_backend_node_doesnt_remove_itself, 27: rdbms_backend_db_queries, 28: rdbms_backend_publishes_node_ip, 29: no_record_for_node, 30: no_ip_in_db, 31: epmd_just_returns_ip_from_db, 32: address_please, 33: address_please_returns_ip, 34: address_please_returns_ip_fallbacks_to_resolve_with_file_backend, 35: address_please_returns_ip_127_0_0_1_from_db]. 36: 37: suite() -> 38: distributed_helper:require_rpc_nodes([mim, mim2]). 39: 40: %%-------------------------------------------------------------------- 41: %% Init & teardown 42: %%-------------------------------------------------------------------- 43: 44: init_per_group(rdbms, Config) -> 45: case not ct_helper:is_ct_running() 46: orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of 47: false -> {skip, rdbms_or_ct_not_running}; 48: true -> 49: stop_and_delete_cets_discovery_if_running(), 50: Config 51: end; 52: init_per_group(_, Config) -> 53: Config. 54: 55: end_per_group(rdbms, Config) -> 56: restore_default_cets_discovery(), 57: Config; 58: end_per_group(_, Config) -> 59: Config. 60: 61: init_per_testcase(address_please_returns_ip, Config) -> 62: start_cets_discovery(Config); 63: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) -> 64: start_cets_discovery_with_file_backnend(Config); 65: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) -> 66: start_cets_discovery_with_real_ips(Config); 67: init_per_testcase(_CaseName, Config) -> Config. 68: 69: end_per_testcase(Name, Config) when Name == address_please_returns_ip; 70: Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend; 71: Name == address_please_returns_ip_127_0_0_1_from_db -> 72: stop_cets_discovery(), 73: Config; 74: end_per_testcase(_CaseName, Config) -> 75: unmock(mim()), 76: unmock(mim2()). 77: 78: %%-------------------------------------------------------------------- 79: %% Test cases 80: %%-------------------------------------------------------------------- 81: 82: file_backend(Config) -> 83: Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"), 84: Opts = #{disco_file => Path}, 85: State = rpc(mim(), cets_discovery_file, init, [Opts]), 86: {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]), 87: ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)). 88: 89: rdbms_backend(_Config) -> 90: CN = random_cluster_name(?FUNCTION_NAME), 91: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 92: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 93: 94: State1 = disco_init(mim(), Opts1), 95: {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 96: ?assertMatch(#{last_query_info := #{already_registered := false}}, State1_2), 97: ?assertEqual([], Nodes1_2), 98: 99: %% "test2" node can see "test1" on initial registration 100: State2 = disco_init(mim2(), Opts2), 101: {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 102: ?assertMatch(#{last_query_info := #{already_registered := false}}, State2_2), 103: ?assertEqual([test1], Nodes2_2), 104: 105: %% "test2" node can see "test1" on update 106: {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), 107: ?assertEqual(lists:sort([test1, test2]), lists:sort(Nodes2_3)), 108: ?assertMatch(#{last_query_info := #{already_registered := true}}, State2_3). 109: 110: rdbms_backend_supports_auto_cleaning(_Config) -> 111: Timestamp = month_ago(), 112: mock_timestamp(mim(), Timestamp), 113: CN = random_cluster_name(?FUNCTION_NAME), 114: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 115: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 116: 117: %% test1 row is written with an old (mocked) timestamp 118: State1 = disco_init(mim(), Opts1), 119: {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 120: {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), 121: ?assertEqual([], Nodes1_2), 122: ?assertEqual([test1], Nodes1_3), 123: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2), 124: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3), 125: 126: %% test2 would clean test1 registration 127: %% We don't mock on mim2 node, so timestamps would differ 128: State2 = disco_init(mim2(), Opts2), 129: {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 130: ?assertEqual([], Nodes2_2), 131: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, 132: State2_2), 133: {{ok, Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2), 134: ?assertEqual([test2], Nodes2_3), 135: #{last_query_info := #{last_rows := SelectedRows}} = State2_3, 136: ?assertMatch(1, length(SelectedRows)). 137: 138: rdbms_backend_node_doesnt_remove_itself(_Config) -> 139: Timestamp = month_ago(), 140: mock_timestamp(mim(), Timestamp), 141: CN = random_cluster_name(?FUNCTION_NAME), 142: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 143: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 144: 145: %% test1 row is written with an old (mocked) timestamp 146: State1 = disco_init(mim(), Opts1), 147: {{ok, Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 148: ?assertEqual([], Nodes1_2), 149: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_2), 150: 151: unmock_timestamp(mim()), 152: %% test1 row is not removed and timestamp is updated 153: {{ok, Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), 154: ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1_3), 155: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, 156: State1_3), 157: ?assertEqual([test1], Nodes1_3), 158: 159: State2 = disco_init(mim2(), Opts2), 160: {{ok, Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 161: ?assertEqual([test1], Nodes2_2), 162: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, 163: State2_2). 164: 165: rdbms_backend_db_queries(_Config) -> 166: CN = random_cluster_name(?FUNCTION_NAME), 167: TS = rpc(mim(), mongoose_rdbms_timestamp, select, []), 168: TS2 = TS + 100, 169: 170: %% insertion fails if node name or node num is already added for the cluster 171: ?assertEqual({updated, 1}, insert_new(CN, <<"test1">>, 1, <<>>, TS)), 172: ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 1, <<>>, TS)), 173: ?assertMatch({error, _}, insert_new(CN, <<"test1">>, 2, <<>>, TS)), 174: ?assertMatch({error, _}, insert_new(CN, <<"test2">>, 1, <<>>, TS)), 175: ?assertEqual({updated, 1}, insert_new(CN, <<"test2">>, 2, <<>>, TS)), 176: 177: %% update of the timestamp works correctly 178: {selected, SelectedNodes1} = select(CN), 179: ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS}, {<<"test2">>, 2, <<>>, TS}]), 180: lists:sort(SelectedNodes1)), 181: ?assertEqual({updated, 1}, update_existing(CN, <<"test1">>, <<>>, TS2)), 182: {selected, SelectedNodes2} = select(CN), 183: ?assertEqual(lists:sort([{<<"test1">>, 1, <<>>, TS2}, {<<"test2">>, 2, <<>>, TS}]), 184: lists:sort(SelectedNodes2)), 185: 186: %% node removal work correctly 187: ?assertEqual({updated, 1}, delete_node_from_db(CN, <<"test1">>)), 188: ?assertEqual({selected, [{<<"test2">>, 2, <<>>, TS}]}, select(CN)). 189: 190: rdbms_backend_publishes_node_ip(_Config) -> 191: %% get_pairs would return only real available nodes, so use the real node names 192: Node1b = atom_to_binary(maps:get(node, mim())), 193: Node2b = atom_to_binary(maps:get(node, mim2())), 194: CN = random_cluster_name(?FUNCTION_NAME), 195: Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b, 196: node_ip_binary => <<"127.0.0.1">>}, 197: Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b, 198: node_ip_binary => <<"127.0.0.1">>}, 199: State1 = disco_init(mim(), Opts1), 200: State2 = disco_init(mim2(), Opts2), 201: {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 202: {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 203: {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), 204: {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), 205: {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b), 206: {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b). 207: 208: no_record_for_node(_Config) -> 209: Node = <<"mongoose@badhost">>, 210: BackState = #{address_pairs => #{}}, 211: {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node), 212: ok. 213: 214: no_ip_in_db(_Config) -> 215: Node = <<"mongoose@noiphost">>, 216: BackState = #{address_pairs => #{Node => <<>>}}, 217: {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node), 218: ok. 219: 220: epmd_just_returns_ip_from_db(_Config) -> 221: Node = <<"mongoose@noepmdhost">>, 222: %% IP from a test range 223: BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}}, 224: {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node). 225: 226: address_please(_Config) -> 227: {error, nxdomain} = 228: rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]). 229: 230: address_please_returns_ip(Config) -> 231: Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]), 232: Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]), 233: ct:log("system_info ~p", [Info]), 234: {ok, {192, 168, 115, 112}} = Res. 235: 236: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) -> 237: Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]), 238: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 239: ct:log("system_info ~p", [Info]), 240: {ok, {127, 0, 0, 1}} = Res. 241: 242: address_please_returns_ip_127_0_0_1_from_db(Config) -> 243: Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]), 244: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 245: ct:log("system_info ~p", [Info]), 246: {ok, {127, 0, 0, 1}} = Res. 247: 248: %%-------------------------------------------------------------------- 249: %% Helpers 250: %%-------------------------------------------------------------------- 251: 252: disco_init(Node, Opts) -> 253: State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]), 254: log_disco_request(?FUNCTION_NAME, Node, Opts, State), 255: State. 256: 257: disco_get_nodes(Node, State) -> 258: NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]), 259: log_disco_request(?FUNCTION_NAME, Node, State, NewState), 260: NewState. 261: 262: match_node_name(Node, SysInfo, NodeToLookup) -> 263: rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]). 264: 265: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) -> 266: ct:log("[0] disco_init(~p,~n" ++ 267: " ~p) =~n" ++ 268: " ~p", 269: [Node, Opts, State]), 270: erlang:put({disco, Node, CN}, 1); 271: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) -> 272: N = case erlang:get({disco, Node, CN}) of 273: undefined -> 1; 274: Int when is_integer(Int) -> Int 275: end, 276: ct:log("[~p] disco_get_nodes(~p,~n" ++ 277: " ~p) =~n" ++ 278: " ~p", 279: [N, Node, OldState, NewState]), 280: erlang:put({disco, Node, CN}, N+1). 281: 282: timestamp() -> 283: os:system_time(second). 284: 285: month_ago() -> 286: timestamp() - timer:hours(24 * 30) div 1000. 287: 288: mock_timestamp(Node, Timestamp) -> 289: ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]), 290: ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]), 291: %% Ensure that we mock 292: Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []). 293: 294: unmock_timestamp(Node) -> 295: ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]). 296: 297: unmock(Node) -> 298: rpc(Node, meck, unload, []). 299: 300: random_cluster_name(CaseName) -> 301: Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []), 302: <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>. 303: 304: insert_new(CN, BinNode, NodeNum, Address, TS) -> 305: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]), 306: ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]), 307: Ret. 308: 309: select(CN) -> 310: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]), 311: ct:log("select(~p) = ~p", [CN, Ret]), 312: Ret. 313: 314: update_existing(CN, BinNode, Address, TS) -> 315: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [CN, BinNode, Address, TS]), 316: ct:log("select(~p, ~p, ~p, ~p) = ~p", [CN, BinNode, Address, TS, Ret]), 317: Ret. 318: 319: delete_node_from_db(CN, BinNode) -> 320: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [CN, BinNode]), 321: ct:log("delete_node_from_db(~p, ~p) = ~p", [CN, BinNode, Ret]), 322: Ret. 323: 324: start_cets_discovery(Config) -> 325: start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)), 326: start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)), 327: force_nodes_to_see_each_other(mim(), mim2()), 328: Config. 329: 330: start_cets_discovery_with_real_ips(Config) -> 331: start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)), 332: start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)), 333: force_nodes_to_see_each_other(mim(), mim2()), 334: Config. 335: 336: start_cets_discovery_with_file_backnend(Config) -> 337: start_disco(mim(), cets_disco_spec_for_file_backend()), 338: start_disco(mim2(), cets_disco_spec_for_file_backend()), 339: Config. 340: 341: stop_cets_discovery() -> 342: ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]), 343: ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]). 344: 345: stop_and_delete_cets_discovery() -> 346: stop_cets_discovery(), 347: ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]), 348: ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]). 349: 350: stop_and_delete_cets_discovery_if_running() -> 351: case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of 352: undefined -> 353: ok; 354: _ -> 355: stop_and_delete_cets_discovery() 356: end. 357: 358: restore_default_cets_discovery() -> 359: restore_default_cets_discovery(mim()), 360: restore_default_cets_discovery(mim2()). 361: 362: restore_default_cets_discovery(Node) -> 363: case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of 364: [] -> 365: ok; 366: [Spec] -> 367: start_disco(Node, Spec) 368: end. 369: 370: cets_disco_spec(Node, IP) -> 371: DiscoOpts = #{ 372: backend_module => mongoose_cets_discovery_rdbms, 373: cluster_name => <<"mim">>, 374: node_name_to_insert => Node, 375: node_ip_binary => IP, 376: name => mongoose_cets_discovery}, 377: cets_disco_spec(DiscoOpts). 378: 379: cets_disco_spec_for_file_backend() -> 380: DiscoOpts = #{ 381: backend_module => cets_discovery_file, 382: disco_file => "/tmp/does_not_exist", 383: name => mongoose_cets_discovery}, 384: cets_disco_spec(DiscoOpts). 385: 386: cets_disco_spec(DiscoOpts) -> 387: #{ 388: id => cets_discovery, 389: start => {mongoose_cets_discovery, start_link, [DiscoOpts]}, 390: restart => temporary, 391: type => worker, 392: shutdown => infinity, 393: modules => [cets_discovery]}. 394: 395: send_check(Node) -> 396: rpc(Node, erlang, send, [mongoose_cets_discovery, check]). 397: 398: wait_for_get_nodes(Node) -> 399: ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]). 400: 401: start_disco(Node, Spec) -> 402: {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]). 403: 404: force_nodes_to_see_each_other(Node1, Node2) -> 405: send_check(Node2), 406: wait_for_get_nodes(Node2), 407: send_check(Node1), 408: wait_for_get_nodes(Node1), 409: send_check(Node2), 410: wait_for_get_nodes(Node2).