1: -module(cets_disco_SUITE). 2: -compile([export_all, nowarn_export_all]). 3: 4: -import(distributed_helper, [mim/0, mim2/0, rpc/4]). 5: -include_lib("common_test/include/ct.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: 8: 9: %%-------------------------------------------------------------------- 10: %% Suite configuration 11: %%-------------------------------------------------------------------- 12: 13: all() -> 14: [{group, file}, {group, rdbms}]. 15: 16: groups() -> 17: [{file, [], file_cases()}, 18: {rdbms, [], rdbms_cases()}]. 19: 20: file_cases() -> 21: [file_backend]. 22: 23: rdbms_cases() -> 24: [rdbms_backend, 25: rdbms_backend_supports_cluster_change, 26: rdbms_backend_supports_auto_cleaning, 27: rdbms_backend_node_doesnt_remove_itself, 28: rdbms_backend_db_queries, 29: rdbms_backend_publishes_node_ip, 30: no_record_for_node, 31: no_ip_in_db, 32: epmd_just_returns_ip_from_db, 33: address_please, 34: address_please_returns_ip, 35: address_please_returns_ip_fallbacks_to_resolve_with_file_backend, 36: address_please_returns_ip_127_0_0_1_from_db]. 37: 38: suite() -> 39: distributed_helper:require_rpc_nodes([mim, mim2]). 40: 41: %%-------------------------------------------------------------------- 42: %% Init & teardown 43: %%-------------------------------------------------------------------- 44: 45: init_per_group(rdbms, Config) -> 46: case not ct_helper:is_ct_running() 47: orelse mongoose_helper:is_rdbms_enabled(domain_helper:host_type()) of 48: false -> {skip, rdbms_or_ct_not_running}; 49: true -> 50: stop_and_delete_cets_discovery_if_running(), 51: Config 52: end; 53: init_per_group(_, Config) -> 54: Config. 55: 56: end_per_group(rdbms, Config) -> 57: restore_default_cets_discovery(), 58: Config; 59: end_per_group(_, Config) -> 60: Config. 61: 62: init_per_testcase(address_please_returns_ip, Config) -> 63: start_cets_discovery(Config); 64: init_per_testcase(address_please_returns_ip_fallbacks_to_resolve_with_file_backend, Config) -> 65: start_cets_discovery_with_file_backnend(Config); 66: init_per_testcase(address_please_returns_ip_127_0_0_1_from_db, Config) -> 67: start_cets_discovery_with_real_ips(Config); 68: init_per_testcase(_CaseName, Config) -> Config. 69: 70: end_per_testcase(Name, Config) when Name == address_please_returns_ip; 71: Name == address_please_returns_ip_fallbacks_to_resolve_with_file_backend; 72: Name == address_please_returns_ip_127_0_0_1_from_db -> 73: stop_cets_discovery(), 74: Config; 75: end_per_testcase(_CaseName, _Config) -> 76: unmock(mim()), 77: unmock(mim2()). 78: 79: %%-------------------------------------------------------------------- 80: %% Test cases 81: %%-------------------------------------------------------------------- 82: 83: file_backend(Config) -> 84: Path = filename:join(?config(mim_data_dir, Config), "nodes.txt"), 85: Opts = #{disco_file => Path}, 86: State = rpc(mim(), cets_discovery_file, init, [Opts]), 87: {{ok, Nodes}, _} = rpc(mim(), cets_discovery_file, get_nodes, [State]), 88: ?assertEqual(lists:sort(['node1@localhost', 'node2@otherhost']), lists:sort(Nodes)). 89: 90: rdbms_backend(_Config) -> 91: CN = random_cluster_name(?FUNCTION_NAME), 92: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 93: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 94: 95: init_and_get_nodes(mim(), Opts1, []), 96: 97: %% "test2" node can see "test1" on initial registration 98: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 99: 100: %% "test2" node can see "test1" on update 101: get_nodes(mim2(), State2, [test1, test2]). 102: 103: rdbms_backend_supports_cluster_change(_Config) -> 104: CN1 = random_cluster_name(?FUNCTION_NAME), 105: CN2 = <<CN1/binary, "_new">>, 106: Opts1 = #{cluster_name => CN1, node_name_to_insert => <<"test1">>}, 107: Opts2 = #{cluster_name => CN1, node_name_to_insert => <<"test2">>}, 108: 109: %% Nodes test1 and test2 are in CN1, and they become connected 110: State1 = init_and_get_nodes(mim(), Opts1, []), 111: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 112: get_nodes(mim(), State1, [test1, test2]), 113: 114: %% Node test1 moves to CN2, and the nodes are disconnected 115: NewState1 = init_and_get_nodes(mim(), Opts1#{cluster_name := CN2}, []), 116: get_nodes(mim2(), State2, [test2]), 117: NewState1A = get_nodes(mim(), NewState1, [test1]), 118: 119: %% Node test2 moves to CN2, and the nodes are connected again 120: init_and_get_nodes(mim2(), Opts2#{cluster_name := CN2}, [test1]), 121: get_nodes(mim(), NewState1A, [test1, test2]). 122: 123: rdbms_backend_supports_auto_cleaning(_Config) -> 124: Timestamp = month_ago(), 125: mock_timestamp(mim(), Timestamp), 126: CN = random_cluster_name(?FUNCTION_NAME), 127: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 128: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 129: 130: %% test1 row is written with an old (mocked) timestamp 131: State1 = init_and_get_nodes(mim(), Opts1, []), 132: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1), 133: State1A = get_nodes(mim(), State1, [test1]), 134: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A), 135: 136: %% test2 would clean test1 registration 137: %% We don't mock on mim2 node, so timestamps would differ 138: State2 = init_and_get_nodes(mim2(), Opts2, []), 139: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, [<<"test1">>]}}}, State2), 140: State2A = get_nodes(mim2(), State2, [test2]), 141: #{last_query_info := #{last_rows := SelectedRows}} = State2A, 142: ?assertMatch(1, length(SelectedRows)). 143: 144: rdbms_backend_node_doesnt_remove_itself(_Config) -> 145: Timestamp = month_ago(), 146: mock_timestamp(mim(), Timestamp), 147: CN = random_cluster_name(?FUNCTION_NAME), 148: Opts1 = #{cluster_name => CN, node_name_to_insert => <<"test1">>}, 149: Opts2 = #{cluster_name => CN, node_name_to_insert => <<"test2">>}, 150: 151: %% test1 row is written with an old (mocked) timestamp 152: State1 = init_and_get_nodes(mim(), Opts1, []), 153: ?assertMatch(#{last_query_info := #{timestamp := Timestamp}}, State1), 154: 155: unmock_timestamp(mim()), 156: %% test1 row is not removed and timestamp is updated 157: State1A = get_nodes(mim(), State1, [test1]), 158: ?assertNotMatch(#{last_query_info := #{timestamp := Timestamp}}, State1A), 159: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State1A), 160: 161: State2 = init_and_get_nodes(mim2(), Opts2, [test1]), 162: ?assertMatch(#{last_query_info := #{run_cleaning_result := {removed, []}}}, State2). 163: 164: rdbms_backend_db_queries(_Config) -> 165: CN = random_cluster_name(?FUNCTION_NAME), 166: TS = rpc(mim(), mongoose_rdbms_timestamp, select, []), 167: TS2 = TS + 100, 168: 169: %% insertion fails if node name or node num is already added for the cluster 170: ?assertEqual({updated, 1}, insert_new(CN, <<"testA">>, 1, <<>>, TS)), 171: ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 1, <<>>, TS)), 172: ?assertMatch({error, _}, insert_new(CN, <<"testA">>, 2, <<>>, TS)), 173: ?assertMatch({error, _}, insert_new(CN, <<"testB">>, 1, <<>>, TS)), 174: ?assertEqual({updated, 1}, insert_new(CN, <<"testB">>, 2, <<>>, TS)), 175: 176: %% insertion fails if node is a member of another cluster 177: ?assertMatch({error, _}, insert_new(<<"my-cluster">>, <<"testA">>, 1, <<>>, TS)), 178: 179: %% update of the timestamp works correctly 180: {selected, SelectedNodes1} = select(CN), 181: ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS}, {<<"testB">>, 2, <<>>, TS}]), 182: lists:sort(SelectedNodes1)), 183: ?assertEqual({updated, 1}, update_existing(<<"testA">>, <<>>, TS2)), 184: {selected, SelectedNodes2} = select(CN), 185: ?assertEqual(lists:sort([{<<"testA">>, 1, <<>>, TS2}, {<<"testB">>, 2, <<>>, TS}]), 186: lists:sort(SelectedNodes2)), 187: 188: %% node removal works correctly 189: ?assertEqual({updated, 1}, delete_node_from_db(<<"testA">>)), 190: ?assertEqual({selected, [{<<"testB">>, 2, <<>>, TS}]}, select(CN)). 191: 192: rdbms_backend_publishes_node_ip(_Config) -> 193: %% get_pairs would return only real available nodes, so use the real node names 194: Node1b = atom_to_binary(maps:get(node, mim())), 195: Node2b = atom_to_binary(maps:get(node, mim2())), 196: CN = random_cluster_name(?FUNCTION_NAME), 197: Opts1 = #{cluster_name => CN, node_name_to_insert => Node1b, 198: node_ip_binary => <<"127.0.0.1">>}, 199: Opts2 = #{cluster_name => CN, node_name_to_insert => Node2b, 200: node_ip_binary => <<"127.0.0.1">>}, 201: State1 = disco_init(mim(), Opts1), 202: State2 = disco_init(mim2(), Opts2), 203: {{ok, _Nodes1_2}, State1_2} = disco_get_nodes(mim(), State1), 204: {{ok, _Nodes2_2}, State2_2} = disco_get_nodes(mim2(), State2), 205: {{ok, _Nodes1_3}, State1_3} = disco_get_nodes(mim(), State1_2), 206: {{ok, _Nodes2_3}, State2_3} = disco_get_nodes(mim2(), State2_2), 207: {ok, {127, 0, 0, 1}} = match_node_name(mim2(), State2_3, Node1b), 208: {ok, {127, 0, 0, 1}} = match_node_name(mim(), State1_3, Node2b). 209: 210: no_record_for_node(_Config) -> 211: Node = <<"mongoose@badhost">>, 212: BackState = #{address_pairs => #{}}, 213: {error, {no_record_for_node, Node}} = match_node_name(mim(), BackState, Node), 214: ok. 215: 216: no_ip_in_db(_Config) -> 217: Node = <<"mongoose@noiphost">>, 218: BackState = #{address_pairs => #{Node => <<>>}}, 219: {error, {no_ip_in_db, Node}} = match_node_name(mim(), BackState, Node), 220: ok. 221: 222: epmd_just_returns_ip_from_db(_Config) -> 223: Node = <<"mongoose@noepmdhost">>, 224: %% IP from a test range 225: BackState = #{address_pairs => #{Node => <<"192.0.2.1">>}}, 226: {ok, {192, 0, 2, 1}} = match_node_name(mim(), BackState, Node). 227: 228: address_please(_Config) -> 229: {error, nxdomain} = 230: rpc(mim(), mongoose_epmd, address_please, ["mongooseim", "badbadhost", inet]). 231: 232: address_please_returns_ip(Config) -> 233: Res = rpc(mim(), mongoose_epmd, address_please, ["testmim2", "localhost", inet]), 234: Info = rpc(mim(), cets_discovery, system_info, [mongoose_cets_discovery]), 235: ct:log("system_info ~p", [Info]), 236: {ok, {192, 168, 115, 112}} = Res. 237: 238: address_please_returns_ip_fallbacks_to_resolve_with_file_backend(Config) -> 239: Res = rpc(mim2(), mongoose_epmd, address_please, ["testmim1", "localhost", inet]), 240: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 241: ct:log("system_info ~p", [Info]), 242: {ok, {127, 0, 0, 1}} = Res. 243: 244: address_please_returns_ip_127_0_0_1_from_db(Config) -> 245: Res = rpc(mim2(), mongoose_epmd, address_please, ["node1", "localhost", inet]), 246: Info = rpc(mim2(), cets_discovery, system_info, [mongoose_cets_discovery]), 247: ct:log("system_info ~p", [Info]), 248: {ok, {127, 0, 0, 1}} = Res. 249: 250: %%-------------------------------------------------------------------- 251: %% Helpers 252: %%-------------------------------------------------------------------- 253: 254: init_and_get_nodes(RPCNode, Opts, ExpectedNodes) -> 255: StateIn = disco_init(RPCNode, Opts), 256: get_nodes(RPCNode, StateIn, ExpectedNodes, false). 257: 258: get_nodes(RPCNode, StateIn, ExpectedNodes) -> 259: get_nodes(RPCNode, StateIn, ExpectedNodes, true). 260: 261: get_nodes(RPCNode, StateIn, ExpectedNodes, AlreadyRegistered) -> 262: {{ok, Nodes}, State} = disco_get_nodes(RPCNode, StateIn), 263: ?assertEqual(lists:sort(ExpectedNodes), lists:sort(Nodes)), 264: ?assertMatch(#{last_query_info := #{already_registered := AlreadyRegistered}}, State), 265: State. 266: 267: disco_init(Node, Opts) -> 268: State = rpc(Node, mongoose_cets_discovery_rdbms, init, [Opts]), 269: log_disco_request(?FUNCTION_NAME, Node, Opts, State), 270: State. 271: 272: disco_get_nodes(Node, State) -> 273: NewState = rpc(Node, mongoose_cets_discovery_rdbms, get_nodes, [State]), 274: log_disco_request(?FUNCTION_NAME, Node, State, NewState), 275: NewState. 276: 277: match_node_name(Node, SysInfo, NodeToLookup) -> 278: rpc(Node, mongoose_epmd, match_node_name, [SysInfo, NodeToLookup]). 279: 280: log_disco_request(disco_init, Node, #{cluster_name := CN} = Opts, State) -> 281: ct:log("[0] disco_init(~p,~n" ++ 282: " ~p) =~n" ++ 283: " ~p", 284: [Node, Opts, State]), 285: erlang:put({disco, Node, CN}, 1); 286: log_disco_request(disco_get_nodes, Node, #{cluster_name := CN} = OldState, NewState) -> 287: N = case erlang:get({disco, Node, CN}) of 288: undefined -> 1; 289: Int when is_integer(Int) -> Int 290: end, 291: ct:log("[~p] disco_get_nodes(~p,~n" ++ 292: " ~p) =~n" ++ 293: " ~p", 294: [N, Node, OldState, NewState]), 295: erlang:put({disco, Node, CN}, N+1). 296: 297: timestamp() -> 298: os:system_time(second). 299: 300: month_ago() -> 301: timestamp() - timer:hours(24 * 30) div 1000. 302: 303: mock_timestamp(Node, Timestamp) -> 304: ok = rpc(Node, meck, new, [mongoose_rdbms_timestamp, [passthrough, no_link]]), 305: ok = rpc(Node, meck, expect, [mongoose_rdbms_timestamp, select, 0, Timestamp]), 306: %% Ensure that we mock 307: Timestamp = rpc(Node, mongoose_rdbms_timestamp, select, []). 308: 309: unmock_timestamp(Node) -> 310: ok = rpc(Node, meck, unload, [mongoose_rdbms_timestamp]). 311: 312: unmock(Node) -> 313: rpc(Node, meck, unload, []). 314: 315: random_cluster_name(CaseName) -> 316: Rand = rpc(mim(), mongoose_bin, gen_from_crypto, []), 317: <<"big_test_", (atom_to_binary(CaseName))/binary, "_", Rand/binary>>. 318: 319: insert_new(CN, BinNode, NodeNum, Address, TS) -> 320: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, insert_new, [CN, BinNode, NodeNum, Address, TS]), 321: ct:log("insert_new(~p, ~p, ~p, ~p, ~p) = ~p", [CN, BinNode, NodeNum, Address, TS, Ret]), 322: Ret. 323: 324: select(CN) -> 325: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, select, [CN]), 326: ct:log("select(~p) = ~p", [CN, Ret]), 327: Ret. 328: 329: update_existing(BinNode, Address, TS) -> 330: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, update_existing, [BinNode, Address, TS]), 331: ct:log("select(~p, ~p, ~p) = ~p", [BinNode, Address, TS, Ret]), 332: Ret. 333: 334: delete_node_from_db(BinNode) -> 335: Ret = rpc(mim(), mongoose_cets_discovery_rdbms, delete_node_from_db, [BinNode]), 336: ct:log("delete_node_from_db(~p) = ~p", [BinNode, Ret]), 337: Ret. 338: 339: start_cets_discovery(Config) -> 340: start_disco(mim(), cets_disco_spec(<<"testmim1@localhost">>, <<"192.168.115.111">>)), 341: start_disco(mim2(), cets_disco_spec(<<"testmim2@localhost">>, <<"192.168.115.112">>)), 342: force_nodes_to_see_each_other(mim(), mim2()), 343: Config. 344: 345: start_cets_discovery_with_real_ips(Config) -> 346: start_disco(mim(), cets_disco_spec(<<"node1@localhost">>, <<"127.0.0.1">>)), 347: start_disco(mim2(), cets_disco_spec(<<"node2@localhost">>, <<"127.0.0.1">>)), 348: force_nodes_to_see_each_other(mim(), mim2()), 349: Config. 350: 351: start_cets_discovery_with_file_backnend(Config) -> 352: start_disco(mim(), cets_disco_spec_for_file_backend()), 353: start_disco(mim2(), cets_disco_spec_for_file_backend()), 354: Config. 355: 356: stop_cets_discovery() -> 357: ok = rpc(mim(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]), 358: ok = rpc(mim2(), supervisor, terminate_child, [ejabberd_sup, cets_discovery]). 359: 360: stop_and_delete_cets_discovery() -> 361: stop_cets_discovery(), 362: ok = rpc(mim(), supervisor, delete_child, [ejabberd_sup, cets_discovery]), 363: ok = rpc(mim2(), supervisor, delete_child, [ejabberd_sup, cets_discovery]). 364: 365: stop_and_delete_cets_discovery_if_running() -> 366: case rpc(mim(), erlang, whereis, [mongoose_cets_discovery]) of 367: undefined -> 368: ok; 369: _ -> 370: stop_and_delete_cets_discovery() 371: end. 372: 373: restore_default_cets_discovery() -> 374: restore_default_cets_discovery(mim()), 375: restore_default_cets_discovery(mim2()). 376: 377: restore_default_cets_discovery(Node) -> 378: case rpc(Node, mongoose_cets_discovery, supervisor_specs, []) of 379: [] -> 380: ok; 381: [Spec] -> 382: start_disco(Node, Spec) 383: end. 384: 385: cets_disco_spec(Node, IP) -> 386: DiscoOpts = #{ 387: backend_module => mongoose_cets_discovery_rdbms, 388: cluster_name => <<"mim">>, 389: node_name_to_insert => Node, 390: node_ip_binary => IP, 391: name => mongoose_cets_discovery}, 392: cets_disco_spec(DiscoOpts). 393: 394: cets_disco_spec_for_file_backend() -> 395: DiscoOpts = #{ 396: backend_module => cets_discovery_file, 397: disco_file => "/tmp/does_not_exist", 398: name => mongoose_cets_discovery}, 399: cets_disco_spec(DiscoOpts). 400: 401: cets_disco_spec(DiscoOpts) -> 402: #{ 403: id => cets_discovery, 404: start => {mongoose_cets_discovery, start_link, [DiscoOpts]}, 405: restart => temporary, 406: type => worker, 407: shutdown => infinity, 408: modules => [cets_discovery]}. 409: 410: send_check(Node) -> 411: rpc(Node, erlang, send, [mongoose_cets_discovery, check]). 412: 413: wait_for_get_nodes(Node) -> 414: ok = rpc(Node, cets_discovery, wait_for_get_nodes, [mongoose_cets_discovery, 5000]). 415: 416: start_disco(Node, Spec) -> 417: {ok, _} = rpc(Node, supervisor, start_child, [ejabberd_sup, Spec]). 418: 419: force_nodes_to_see_each_other(Node1, Node2) -> 420: send_check(Node2), 421: wait_for_get_nodes(Node2), 422: send_check(Node1), 423: wait_for_get_nodes(Node1), 424: send_check(Node2), 425: wait_for_get_nodes(Node2).