1: -module(cets_disco_SUITE). 2: -compile([export_all, nowarn_export_all]). 3: 4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]). 5: -include_lib("common_test/include/ct.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: 8: 9: %%-------------------------------------------------------------------- 10: %% Suite configuration 11: %%-------------------------------------------------------------------- 12: 13: all() -> 14: [{group, file}, {group, rdbms}]. 15: 16: groups() -> 17: [{file, [], file_cases()}, 18: {rdbms, [], rdbms_cases()}]. 19: 20: file_cases() -> 21: [file_backend]. 22: 23: rdbms_cases() -> 24: [rdbms_backend, 25: rdbms_backend_supports_cluster_change, 26: rdbms_backend_cluster_name_contains_cets_version, 27: rdbms_backend_supports_auto_cleaning, 28: rdbms_backend_node_doesnt_remove_itself, 29: rdbms_backend_db_queries, 30: rdbms_backend_publishes_node_ip, 31: no_record_for_node, 32: no_ip_in_db, 33: epmd_just_returns_ip_from_db, 34: address_please, 35: address_please_returns_ip, 36: address_please_returns_ip_fallbacks_to_resolve_with_file_backend, 37: address_please_returns_ip_127_0_0_1_from_db]. 38: 39: suite() -> 40: distributed_helper:require_rpc_nodes([mim, mim2]). 41: 42: %%-------------------------------------------------------------------- 43: %% Init & teardown 44: %%-------------------------------------------------------------------- 45: 46: init_per_group(rdbms, Config) -> 47: case not ct_helper:is_ct_running() 48: orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of 49: false -> {skip, rdbms_or_ct_not_running}; 50: true -> 51: stop_and_delete_cets_discovery_if_running(), 52: Config 53: end; 54: init_per_group(_, Config) -> 55: Config. 56: 57: end_per_group(rdbms, Config) -> 58: restore_default_cets_discovery(), 59: Config; 60: end_per_group(_, Config) -> 61: Config. 62: 63: init_per_testcase(address_please_returns_ip, Config) -> 64: start_cets_discovery(Config); 65: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) -> 66: start_cets_discovery_with_file_backnend(Config); 67: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) -> 68: start_cets_discovery_with_real_ips(Config); 69: init_per_testcase(_CaseName, Config) -> Config. 70: 71: end_per_testcase(Name, Config) when Name == address_please_returns_ip; 72: Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend; 73: Name == address_please_returns_ip_127_0_0_1_from_db -> 74: stop_cets_discovery(), 75: Config; 76: end_per_testcase(_CaseName, _Config) -> 77: unmock(mim()), 78: unmock(mim2()). 79: 80: %%-------------------------------------------------------------------- 81: %% Test cases 82: %%-------------------------------------------------------------------- 83: 84: file_backend(Config) -> 85: Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"), 86: Opts = #{disco_file => Path}, 87: State = rpc(mim(), cets_discovery_file, init, [Opts]), 88: {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]), 89: ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)). 90: 91: rdbms_backend(_Config) -> 92: CN = random_cluster_name(?FUNCTION_NAME), 93: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 94: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 95: 96: init_and_get_nodes(mim(), Opts1, []), 97: 98: %% "test2" node can see "test1" on initial registration 99: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 100: 101: %% "test2" node can see "test1" on update 102: get_nodes(mim2(), State2, [test1, test2]). 103: 104: rdbms_backend_supports_cluster_change(_Config) -> 105: CN1 = random_cluster_name(?FUNCTION_NAME), 106: CN2 = <<CN1/binary, "_new">>, 107: Opts1 = #{cluster_name => CN1, node_name_to_insert => <<"test1">>}, 108: Opts2 = #{cluster_name => CN1, node_name_to_insert => <<"test2">>}, 109: 110: %% Nodes test1 and test2 are in CN1, and they become connected 111: State1 = init_and_get_nodes(mim(), Opts1, []), 112: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 113: get_nodes(mim(), State1, [test1, test2]), 114: 115: %% Node test1 moves to CN2, and the nodes are disconnected 116: NewState1 = init_and_get_nodes(mim(), Opts1#{cluster_name := CN2}, []), 117: get_nodes(mim2(), State2, [test2]), 118: NewState1A = get_nodes(mim(), NewState1, [test1]), 119: 120: %% Node test2 moves to CN2, and the nodes are connected again 121: init_and_get_nodes(mim2(), Opts2#{cluster_name := CN2}, [test1]), 122: get_nodes(mim(), NewState1A, [test1, test2]). 123: 124: rdbms_backend_cluster_name_contains_cets_version(_Config) -> 125: CN = random_cluster_name(?FUNCTION_NAME), 126: Opts = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 127: #{cluster_name := CNWithVsn} = init_and_get_nodes(mim(), Opts, []), 128: [<<>>, Vsn] = binary:split(CNWithVsn, CN), 129: ?assertMatch({match, _}, re:run(Vsn, "-[0-9]+\\.[0-9]+")). 130: 131: rdbms_backend_supports_auto_cleaning(_Config) -> 132: Timestamp = month_ago(), 133: mock_timestamp(mim(), Timestamp), 134: CN = random_cluster_name(?FUNCTION_NAME), 135: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 136: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 137: 138: %% test1 row is written with an old (mocked) timestamp 139: State1 = init_and_get_nodes(mim(), Opts1, []), 140: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1), 141: State1A = get_nodes(mim(), State1, [test1]), 142: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A), 143: 144: %% test2 would clean test1 registration 145: %% We don't mock on mim2 node, so timestamps would differ 146: State2 = init_and_get_nodes(mim2(), Opts2, []), 147: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, State2), 148: State2A = get_nodes(mim2(), State2, [test2]), 149: #{last_query_info := #{last_rows := SelectedRows}} = State2A, 150: ?assertMatch(1, length(SelectedRows)). 151: 152: rdbms_backend_node_doesnt_remove_itself(_Config) -> 153: Timestamp = month_ago(), 154: mock_timestamp(mim(), Timestamp), 155: CN = random_cluster_name(?FUNCTION_NAME), 156: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 157: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 158: 159: %% test1 row is written with an old (mocked) timestamp 160: State1 = init_and_get_nodes(mim(), Opts1, []), 161: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1), 162: 163: unmock_timestamp(mim()), 164: %% test1 row is not removed and timestamp is updated 165: State1A = get_nodes(mim(), State1, [test1]), 166: ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A), 167: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State1A), 168: 169: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 170: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State2). 171: 172: rdbms_backend_db_queries(_Config) -> 173: CN = random_cluster_name(?FUNCTION_NAME), 174: TS = rpc(mim(), mongoose_rdbms_timestamp, select, []), 175: TS2 = TS + 100, 176: 177: %% insertion fails if node name or node num is already added for the cluster 178: ?assertEqual({updated, 1}, insert_new(CN, <<"testA">>, 1, <<>>, TS)), 179: ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 1, <<>>, TS)), 180: ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 2, <<>>, TS)), 181: ?assertMatch({error, _}, insert_new(CN, <<"testB">>, 1, <<>>, TS)), 182: ?assertEqual({updated, 1}, insert_new(CN, <<"testB">>, 2, <<>>, TS)), 183: 184: %% insertion fails if node is a member of another cluster 185: ?assertMatch({error, _}, insert_new(<<"my-cluster">>, <<"testA">>, 1, <<>>, TS)), 186: 187: %% update of the timestamp works correctly 188: {selected, SelectedNodes1} = select(CN), 189: ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS}, {<<"testB">>, 2, <<>>, TS}]), 190: lists:sort(SelectedNodes1)), 191: ?assertEqual({updated, 1}, update_existing(<<"testA">>, <<>>, TS2)), 192: {selected, SelectedNodes2} = select(CN), 193: ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS2}, {<<"testB">>, 2, <<>>, TS}]), 194: lists:sort(SelectedNodes2)), 195: 196: %% node removal works correctly 197: ?assertEqual({updated, 1}, delete_node_from_db(<<"testA">>)), 198: ?assertEqual({selected, [{<<"testB">>, 2, <<>>, TS}]}, select(CN)). 199: 200: rdbms_backend_publishes_node_ip(_Config) -> 201: %% get_pairs would return only real available nodes, so use the real node names 202: Node1b = atom_to_binary(maps:get(node, mim())), 203: Node2b = atom_to_binary(maps:get(node, mim2())), 204: CN = random_cluster_name(?FUNCTION_NAME), 205: Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b, 206: node_ip_binary => <<"127.0.0.1">>}, 207: Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b, 208: node_ip_binary => <<"127.0.0.1">>}, 209: State1 = disco_init(mim(), Opts1), 210: State2 = disco_init(mim2(), Opts2), 211: {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 212: {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 213: {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), 214: {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), 215: {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b), 216: {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b). 217: 218: no_record_for_node(_Config) -> 219: Node = <<"mongoose@badhost">>, 220: BackState = #{address_pairs => #{}}, 221: {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node), 222: ok. 223: 224: no_ip_in_db(_Config) -> 225: Node = <<"mongoose@noiphost">>, 226: BackState = #{address_pairs => #{Node => <<>>}}, 227: {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node), 228: ok. 229: 230: epmd_just_returns_ip_from_db(_Config) -> 231: Node = <<"mongoose@noepmdhost">>, 232: %% IP from a test range 233: BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}}, 234: {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node). 235: 236: address_please(_Config) -> 237: {error, nxdomain} = 238: rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]). 239: 240: address_please_returns_ip(Config) -> 241: Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]), 242: Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]), 243: ct:log("system_info ~p", [Info]), 244: {ok, {192, 168, 115, 112}} = Res. 245: 246: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) -> 247: Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]), 248: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 249: ct:log("system_info ~p", [Info]), 250: {ok, {127, 0, 0, 1}} = Res. 251: 252: address_please_returns_ip_127_0_0_1_from_db(Config) -> 253: Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]), 254: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 255: ct:log("system_info ~p", [Info]), 256: {ok, {127, 0, 0, 1}} = Res. 257: 258: %%-------------------------------------------------------------------- 259: %% Helpers 260: %%-------------------------------------------------------------------- 261: 262: init_and_get_nodes(RPCNode, Opts, ExpectedNodes) -> 263: StateIn = disco_init(RPCNode, Opts), 264: get_nodes(RPCNode, StateIn, ExpectedNodes, false). 265: 266: get_nodes(RPCNode, StateIn, ExpectedNodes) -> 267: get_nodes(RPCNode, StateIn, ExpectedNodes, true). 268: 269: get_nodes(RPCNode, StateIn, ExpectedNodes, AlreadyRegistered) -> 270: {{ok, Nodes}, State} = disco_get_nodes(RPCNode, StateIn), 271: ?assertEqual(lists:sort(ExpectedNodes), lists:sort(Nodes)), 272: ?assertMatch(#{last_query_info := #{already_registered := AlreadyRegistered}}, State), 273: State. 274: 275: disco_init(Node, Opts) -> 276: State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]), 277: log_disco_request(?FUNCTION_NAME, Node, Opts, State), 278: State. 279: 280: disco_get_nodes(Node, State) -> 281: NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]), 282: log_disco_request(?FUNCTION_NAME, Node, State, NewState), 283: NewState. 284: 285: match_node_name(Node, SysInfo, NodeToLookup) -> 286: rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]). 287: 288: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) -> 289: ct:log("[0] disco_init(~p,~n" ++ 290: " ~p) =~n" ++ 291: " ~p", 292: [Node, Opts, State]), 293: erlang:put({disco, Node, CN}, 1); 294: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) -> 295: N = case erlang:get({disco, Node, CN}) of 296: undefined -> 1; 297: Int when is_integer(Int) -> Int 298: end, 299: ct:log("[~p] disco_get_nodes(~p,~n" ++ 300: " ~p) =~n" ++ 301: " ~p", 302: [N, Node, OldState, NewState]), 303: erlang:put({disco, Node, CN}, N+1). 304: 305: timestamp() -> 306: os:system_time(second). 307: 308: month_ago() -> 309: timestamp() - timer:hours(24 * 30) div 1000. 310: 311: mock_timestamp(Node, Timestamp) -> 312: ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]), 313: ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]), 314: %% Ensure that we mock 315: Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []). 316: 317: unmock_timestamp(Node) -> 318: ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]). 319: 320: unmock(Node) -> 321: rpc(Node, meck, unload, []). 322: 323: random_cluster_name(CaseName) -> 324: Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []), 325: <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>. 326: 327: insert_new(CN, BinNode, NodeNum, Address, TS) -> 328: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]), 329: ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]), 330: Ret. 331: 332: select(CN) -> 333: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]), 334: ct:log("select(~p) = ~p", [CN, Ret]), 335: Ret. 336: 337: update_existing(BinNode, Address, TS) -> 338: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [BinNode, Address, TS]), 339: ct:log("select(~p, ~p, ~p) = ~p", [BinNode, Address, TS, Ret]), 340: Ret. 341: 342: delete_node_from_db(BinNode) -> 343: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [BinNode]), 344: ct:log("delete_node_from_db(~p) = ~p", [BinNode, Ret]), 345: Ret. 346: 347: start_cets_discovery(Config) -> 348: start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)), 349: start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)), 350: force_nodes_to_see_each_other(mim(), mim2()), 351: Config. 352: 353: start_cets_discovery_with_real_ips(Config) -> 354: start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)), 355: start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)), 356: force_nodes_to_see_each_other(mim(), mim2()), 357: Config. 358: 359: start_cets_discovery_with_file_backnend(Config) -> 360: start_disco(mim(), cets_disco_spec_for_file_backend()), 361: start_disco(mim2(), cets_disco_spec_for_file_backend()), 362: Config. 363: 364: stop_cets_discovery() -> 365: ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]), 366: ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]). 367: 368: stop_and_delete_cets_discovery() -> 369: stop_cets_discovery(), 370: ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]), 371: ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]). 372: 373: stop_and_delete_cets_discovery_if_running() -> 374: case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of 375: undefined -> 376: ok; 377: _ -> 378: stop_and_delete_cets_discovery() 379: end. 380: 381: restore_default_cets_discovery() -> 382: restore_default_cets_discovery(mim()), 383: restore_default_cets_discovery(mim2()). 384: 385: restore_default_cets_discovery(Node) -> 386: case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of 387: [] -> 388: ok; 389: [Spec] -> 390: start_disco(Node, Spec) 391: end. 392: 393: cets_disco_spec(Node, IP) -> 394: DiscoOpts = #{ 395: backend_module => mongoose_cets_discovery_rdbms, 396: cluster_name => <<"mim">>, 397: node_name_to_insert => Node, 398: node_ip_binary => IP, 399: name => mongoose_cets_discovery}, 400: cets_disco_spec(DiscoOpts). 401: 402: cets_disco_spec_for_file_backend() -> 403: DiscoOpts = #{ 404: backend_module => cets_discovery_file, 405: disco_file => "/tmp/does_not_exist", 406: name => mongoose_cets_discovery}, 407: cets_disco_spec(DiscoOpts). 408: 409: cets_disco_spec(DiscoOpts) -> 410: #{ 411: id => cets_discovery, 412: start => {mongoose_cets_discovery, start_link, [DiscoOpts]}, 413: restart => temporary, 414: type => worker, 415: shutdown => infinity, 416: modules => [cets_discovery]}. 417: 418: send_check(Node) -> 419: rpc(Node, erlang, send, [mongoose_cets_discovery, check]). 420: 421: wait_for_get_nodes(Node) -> 422: ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]). 423: 424: start_disco(Node, Spec) -> 425: {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]). 426: 427: force_nodes_to_see_each_other(Node1, Node2) -> 428: send_check(Node2), 429: wait_for_get_nodes(Node2), 430: send_check(Node1), 431: wait_for_get_nodes(Node1), 432: send_check(Node2), 433: wait_for_get_nodes(Node2).