./ct_report/coverage/mongoose_import_users.COVER.html

1 -module(mongoose_import_users).
2
3 -export([run/1]).
4
5 -define(REGISTER_WORKERS_NUM, 10).
6
7 -type summary() :: #{reason() => [jid:jid() | binary()]}.
8 -type reason() :: ok | exists | not_allowed | invalid_jid | null_password |
9 limit_per_domain_exceeded | bad_csv.
10
11 -export_type([summary/0, reason/0]).
12
13 -spec run(file:name()) -> {ok, summary()} | {error, file_not_found}.
14 run(Filename) ->
15 3 case filelib:is_file(Filename) of
16 true ->
17 2 {ok, CsvStream} = erl_csv:decode_new_s(Filename),
18 2 Workers = spawn_link_workers(),
19 2 WorkersQueue = queue:from_list(Workers),
20 2 {ok, do_import(CsvStream, WorkersQueue)};
21 false ->
22 1 {error, file_not_found}
23 end.
24
25 -spec do_import(erl_csv:csv_stream(), queue:queue()) -> summary().
26 do_import(stream_end, WQueue) ->
27 2 Workers = queue:to_list(WQueue),
28 2 lists:foldl(fun accumulate_results/2, #{}, Workers);
29 do_import(Stream, WQueue) ->
30 12 {ok, Decoded, MoreStream} = erl_csv:decode_s(Stream),
31 12 WQueue1 = send_job_to_next_worker(Decoded, WQueue),
32 12 do_import(MoreStream, WQueue1).
33
34 -spec spawn_link_workers() -> [pid()].
35 spawn_link_workers() ->
36 2 Manager = self(),
37 2 [spawn_link(fun() -> registrator_proc(Manager) end)
38 2 || _ <- lists:seq(1, ?REGISTER_WORKERS_NUM)].
39
40 -spec accumulate_results(pid(), summary()) -> summary().
41 accumulate_results(Pid, Map) ->
42 20 Results = get_results_from_registrator(Pid),
43 20 maps:merge_with(
44
:-(
fun(_Key, List1, List2) -> List1 ++ List2 end,
45 Map,
46 Results).
47
48 -spec get_results_from_registrator(pid()) -> summary().
49 get_results_from_registrator(Pid) ->
50 20 Pid ! get_result,
51 20 receive
52 20 {result, Result} -> Result
53 end.
54
55 -spec send_job_to_next_worker([binary()], queue:queue()) -> queue:queue().
56 send_job_to_next_worker([], WQueue) ->
57 2 WQueue;
58 send_job_to_next_worker([Record], WQueue) ->
59 10 {{value, Worker}, Q1} = queue:out(WQueue),
60 10 Worker ! {process, Record},
61 10 queue:in(Worker, Q1).
62
63 -spec registrator_proc(pid()) -> ok.
64 registrator_proc(Manager) ->
65 20 registrator_proc(Manager, #{}).
66
67 -spec registrator_proc(pid(), summary()) -> ok.
68 registrator_proc(Manager, Map) ->
69 30 receive
70 {process, Data} ->
71 10 {Reason, User} = do_register(Data),
72 10 Map2 = maps:update_with(Reason, fun(List) -> [User | List] end, [User], Map),
73 10 registrator_proc(Manager, Map2);
74 get_result ->
75 20 Manager ! {result, Map}
76 end,
77 30 ok.
78
79 -spec do_register([binary()]) -> {reason(), jid:user() | binary()}.
80 do_register([User, Host, Password] = List) ->
81 8 JID = jid:make_bare(User, Host),
82 8 case ejabberd_auth:try_register(JID, Password) of
83 2 {error, invalid_jid} -> {invalid_jid, join(List)};
84 4 {error, Reason} -> {Reason, JID};
85 2 _ -> {ok, JID}
86 end;
87 do_register(List) ->
88 2 {bad_csv, join(List)}.
89
90 -spec join([binary()]) -> binary().
91 join(Record) ->
92 4 JoinBinary = fun(Elem, <<"">>) -> Elem;
93 10 (Elem, Acc) -> <<Elem/binary, ",", Acc/binary>>
94 end,
95 4 lists:foldr(JoinBinary, <<"">>, Record).
Line Hits Source