./ct_report/coverage/mongoose_import_users.COVER.html

1 -module(mongoose_import_users).
2
3 -export([run/1]).
4
5 -define(REGISTER_WORKERS_NUM, 10).
6
7 -type summary() :: #{reason() => [jid:jid() | binary()]}.
8 -type reason() :: ok | exists | not_allowed | invalid_jid | null_password |
9 limit_per_domain_exceeded | bad_csv.
10
11 -export_type([summary/0, reason/0]).
12
13 -spec run(file:name()) -> {ok, summary()} | {error, file_not_found}.
14 run(Filename) ->
15
:-(
case filelib:is_file(Filename) of
16 true ->
17
:-(
{ok, CsvStream} = erl_csv:decode_new_s(Filename),
18
:-(
Workers = spawn_link_workers(),
19
:-(
WorkersQueue = queue:from_list(Workers),
20
:-(
{ok, do_import(CsvStream, WorkersQueue)};
21 false ->
22
:-(
{error, file_not_found}
23 end.
24
25 -spec do_import(erl_csv:csv_stream(), queue:queue()) -> summary().
26 do_import(stream_end, WQueue) ->
27
:-(
Workers = queue:to_list(WQueue),
28
:-(
lists:foldl(fun accumulate_results/2, #{}, Workers);
29 do_import(Stream, WQueue) ->
30
:-(
{ok, Decoded, MoreStream} = erl_csv:decode_s(Stream),
31
:-(
WQueue1 = send_job_to_next_worker(Decoded, WQueue),
32
:-(
do_import(MoreStream, WQueue1).
33
34 -spec spawn_link_workers() -> [pid()].
35 spawn_link_workers() ->
36
:-(
Manager = self(),
37
:-(
[spawn_link(fun() -> registrator_proc(Manager) end)
38
:-(
|| _ <- lists:seq(1, ?REGISTER_WORKERS_NUM)].
39
40 -spec accumulate_results(pid(), summary()) -> summary().
41 accumulate_results(Pid, Map) ->
42
:-(
Results = get_results_from_registrator(Pid),
43
:-(
maps:merge_with(
44
:-(
fun(_Key, List1, List2) -> List1 ++ List2 end,
45 Map,
46 Results).
47
48 -spec get_results_from_registrator(pid()) -> summary().
49 get_results_from_registrator(Pid) ->
50
:-(
Pid ! get_result,
51
:-(
receive
52
:-(
{result, Result} -> Result
53 end.
54
55 -spec send_job_to_next_worker([binary()], queue:queue()) -> queue:queue().
56 send_job_to_next_worker([], WQueue) ->
57
:-(
WQueue;
58 send_job_to_next_worker([Record], WQueue) ->
59
:-(
{{value, Worker}, Q1} = queue:out(WQueue),
60
:-(
Worker ! {process, Record},
61
:-(
queue:in(Worker, Q1).
62
63 -spec registrator_proc(pid()) -> ok.
64 registrator_proc(Manager) ->
65
:-(
registrator_proc(Manager, #{}).
66
67 -spec registrator_proc(pid(), summary()) -> ok.
68 registrator_proc(Manager, Map) ->
69
:-(
receive
70 {process, Data} ->
71
:-(
{Reason, User} = do_register(Data),
72
:-(
Map2 = maps:update_with(Reason, fun(List) -> [User | List] end, [User], Map),
73
:-(
registrator_proc(Manager, Map2);
74 get_result ->
75
:-(
Manager ! {result, Map}
76 end,
77
:-(
ok.
78
79 -spec do_register([binary()]) -> {reason(), jid:user() | binary()}.
80 do_register([User, Host, Password] = List) ->
81
:-(
JID = jid:make_bare(User, Host),
82
:-(
case ejabberd_auth:try_register(JID, Password) of
83
:-(
{error, invalid_jid} -> {invalid_jid, join(List)};
84
:-(
{error, Reason} -> {Reason, JID};
85
:-(
_ -> {ok, JID}
86 end;
87 do_register(List) ->
88
:-(
{bad_csv, join(List)}.
89
90 -spec join([binary()]) -> binary().
91 join(Record) ->
92
:-(
JoinBinary = fun(Elem, <<"">>) -> Elem;
93
:-(
(Elem, Acc) -> <<Elem/binary, ",", Acc/binary>>
94 end,
95
:-(
lists:foldr(JoinBinary, <<"">>, Record).
Line Hits Source