1 |
|
-module(mongoose_import_users). |
2 |
|
|
3 |
|
-export([run/1]). |
4 |
|
|
5 |
|
-define(REGISTER_WORKERS_NUM, 10). |
6 |
|
|
7 |
|
-type summary() :: #{reason() => [jid:jid() | binary()]}. |
8 |
|
-type reason() :: ok | exists | not_allowed | invalid_jid | null_password | |
9 |
|
limit_per_domain_exceeded | bad_csv. |
10 |
|
|
11 |
|
-export_type([summary/0, reason/0]). |
12 |
|
|
13 |
|
-spec run(file:name()) -> {ok, summary()} | {error, file_not_found}. |
14 |
|
run(Filename) -> |
15 |
3 |
case filelib:is_file(Filename) of |
16 |
|
true -> |
17 |
2 |
{ok, CsvStream} = erl_csv:decode_new_s(Filename), |
18 |
2 |
Workers = spawn_link_workers(), |
19 |
2 |
WorkersQueue = queue:from_list(Workers), |
20 |
2 |
{ok, do_import(CsvStream, WorkersQueue)}; |
21 |
|
false -> |
22 |
1 |
{error, file_not_found} |
23 |
|
end. |
24 |
|
|
25 |
|
-spec do_import(erl_csv:csv_stream(), queue:queue()) -> summary(). |
26 |
|
do_import(stream_end, WQueue) -> |
27 |
2 |
Workers = queue:to_list(WQueue), |
28 |
2 |
lists:foldl(fun accumulate_results/2, #{}, Workers); |
29 |
|
do_import(Stream, WQueue) -> |
30 |
12 |
{ok, Decoded, MoreStream} = erl_csv:decode_s(Stream), |
31 |
12 |
WQueue1 = send_job_to_next_worker(Decoded, WQueue), |
32 |
12 |
do_import(MoreStream, WQueue1). |
33 |
|
|
34 |
|
-spec spawn_link_workers() -> [pid()]. |
35 |
|
spawn_link_workers() -> |
36 |
2 |
Manager = self(), |
37 |
2 |
[spawn_link(fun() -> registrator_proc(Manager) end) |
38 |
2 |
|| _ <- lists:seq(1, ?REGISTER_WORKERS_NUM)]. |
39 |
|
|
40 |
|
-spec accumulate_results(pid(), summary()) -> summary(). |
41 |
|
accumulate_results(Pid, Map) -> |
42 |
20 |
Results = get_results_from_registrator(Pid), |
43 |
20 |
maps:merge_with( |
44 |
:-( |
fun(_Key, List1, List2) -> List1 ++ List2 end, |
45 |
|
Map, |
46 |
|
Results). |
47 |
|
|
48 |
|
-spec get_results_from_registrator(pid()) -> summary(). |
49 |
|
get_results_from_registrator(Pid) -> |
50 |
20 |
Pid ! get_result, |
51 |
20 |
receive |
52 |
20 |
{result, Result} -> Result |
53 |
|
end. |
54 |
|
|
55 |
|
-spec send_job_to_next_worker([binary()], queue:queue()) -> queue:queue(). |
56 |
|
send_job_to_next_worker([], WQueue) -> |
57 |
2 |
WQueue; |
58 |
|
send_job_to_next_worker([Record], WQueue) -> |
59 |
10 |
{{value, Worker}, Q1} = queue:out(WQueue), |
60 |
10 |
Worker ! {process, Record}, |
61 |
10 |
queue:in(Worker, Q1). |
62 |
|
|
63 |
|
-spec registrator_proc(pid()) -> ok. |
64 |
|
registrator_proc(Manager) -> |
65 |
20 |
registrator_proc(Manager, #{}). |
66 |
|
|
67 |
|
-spec registrator_proc(pid(), summary()) -> ok. |
68 |
|
registrator_proc(Manager, Map) -> |
69 |
30 |
receive |
70 |
|
{process, Data} -> |
71 |
10 |
{Reason, User} = do_register(Data), |
72 |
10 |
Map2 = maps:update_with(Reason, fun(List) -> [User | List] end, [User], Map), |
73 |
10 |
registrator_proc(Manager, Map2); |
74 |
|
get_result -> |
75 |
20 |
Manager ! {result, Map} |
76 |
|
end, |
77 |
30 |
ok. |
78 |
|
|
79 |
|
-spec do_register([binary()]) -> {reason(), jid:user() | binary()}. |
80 |
|
do_register([User, Host, Password] = List) -> |
81 |
8 |
JID = jid:make_bare(User, Host), |
82 |
8 |
case ejabberd_auth:try_register(JID, Password) of |
83 |
2 |
{error, invalid_jid} -> {invalid_jid, join(List)}; |
84 |
4 |
{error, Reason} -> {Reason, JID}; |
85 |
2 |
_ -> {ok, JID} |
86 |
|
end; |
87 |
|
do_register(List) -> |
88 |
2 |
{bad_csv, join(List)}. |
89 |
|
|
90 |
|
-spec join([binary()]) -> binary(). |
91 |
|
join(Record) -> |
92 |
4 |
JoinBinary = fun(Elem, <<"">>) -> Elem; |
93 |
10 |
(Elem, Acc) -> <<Elem/binary, ",", Acc/binary>> |
94 |
|
end, |
95 |
4 |
lists:foldr(JoinBinary, <<"">>, Record). |