Skip to content

Commit

Permalink
feat: task
Browse files Browse the repository at this point in the history
  • Loading branch information
dawnwinterLiu committed Apr 19, 2024
1 parent 9ed47f6 commit 03e0e99
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 86 deletions.
9 changes: 8 additions & 1 deletion apps/dgiot/src/transport/dgiot_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ init_ets() ->
subscribe_route_key(Topics, Type, SessionToken) ->
unsubscribe_route_key(SessionToken, Type),
lists:foldl(fun(X, Acc) ->
dgiot_mqtt:subscribe_mgmt(SessionToken, X),
case dgiot_data:get({dlink_client, SessionToken}) of
not_find ->
pass;
Clients ->
lists:foldl(fun(Client, _) ->
dgiot_mqtt:subscribe_mgmt(Client, X)
end, {}, Clients)
end,
Acc ++ [X]
end, [], Topics),
dgiot_data:insert(?DGIOT_ROUTE_KEY, {SessionToken, Type}, Topics).
Expand Down
4 changes: 2 additions & 2 deletions apps/dgiot_api/src/handler/dgiot_data_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ do_request(post_export_data, #{<<"classname">> := Name} = Body, #{<<"sessionToke
end;

%% 导入物模型
do_request(post_import_wmxdata, #{<<"objectId">> := ProductId, <<"file">> := File} = Args, _Context, _Req) ->
io:format("~s ~p Args =~p.~n", [?FILE, ?LINE, Args]),
do_request(post_import_wmxdata, #{<<"objectId">> := ProductId, <<"file">> := File} = _Args, _Context, _Req) ->
%% io:format("~s ~p Args =~p.~n", [?FILE, ?LINE, Args]),
AtomName = dgiot_csv:save_csv_ets(File),
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}),
NewProperties = dgiot_csv:post_properties(Things),
Expand Down
10 changes: 5 additions & 5 deletions apps/dgiot_device/src/utils/dgiot_product_csv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
-include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").

-export([read_csv/3, get_products/1, create_product/4, create_device/3, post_thing/3, get_CategoryId/1, get_channelAcl/1]).
-export([read_csv/4, get_products/1, create_product/5, create_device/3, post_thing/3, get_CategoryId/1, get_channelAcl/1]).
-export([get_max_addrs/1]).

%% dgiot_product_csv:read_csv(<<"8cff09f988">>, <<"modbustcp">>).
%% dgiot_utils:save_csv_ets(<<"/dgiot_file/product/csv/modbustcp.csv">>)
read_csv(ChannelId, FilePath, Is_refresh) ->
read_csv(ChannelId, FilePath, Is_refresh, Is_shard) ->
FileName = dgiot_csv:save_csv_ets(?MODULE, FilePath),
spawn(fun() ->
Productmap = dgiot_product_csv:get_products(FileName),
TdChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"TD">>, <<"TD资源通道"/utf8>>),
{Devicemap, ProductIds} = dgiot_product_csv:create_product(ChannelId, FileName, Productmap, TdChannelId),
{Devicemap, ProductIds} = dgiot_product_csv:create_product(ChannelId, FileName, Productmap, TdChannelId, Is_shard),
dgiot_product_csv:create_device(FileName, Devicemap, ProductIds),
timer:sleep(1000),
dgiot_product_csv:post_thing(FileName, ProductIds, Is_refresh)
Expand All @@ -48,7 +48,7 @@ get_products(FileName) ->
Acc#{ProductName => Devices}
end, #{}, Products).

create_product(ChannelId, FileName, Productmap, TdChannelId) ->
create_product(ChannelId, FileName, Productmap, TdChannelId, Is_shard) ->
AtomName = dgiot_utils:to_atom(FileName),
maps:fold(fun(ProductName, DeviceNames, {Acc, Acc2}) ->
Types = ets:match(AtomName, {'_', [ProductName, '$1', '$2' | '_']}),
Expand Down Expand Up @@ -79,7 +79,7 @@ create_product(ChannelId, FileName, Productmap, TdChannelId) ->
case Result of
{ok, ProductId} ->
%% dgiot_data:insert(AtomName, ProductId, ProductName),
dgiot_data:insert({shard_storage, ProductId}, true),
dgiot_data:insert({shard_storage, ProductId}, Is_shard),
Devicemap =
lists:foldl(fun(DeviceName1, Acc1) ->
Acc1#{DeviceName1 => ProductId}
Expand Down
12 changes: 11 additions & 1 deletion apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
, description/0
]).

save_client(<<Token:34/binary, _Type/binary>> = ClientId) ->
case dgiot_data:get({dlink_client, Token}) of
not_find ->
dgiot_data:insert({dlink_client, Token}, [ClientId]);
ClientIds ->
New_ClientIds = dgiot_utils:unique_2(ClientIds ++ [ClientId]),
dgiot_data:insert({dlink_client, Token}, New_ClientIds)
end.

check(#{peerhost := PeerHost, username := <<"dgiot">>}, AuthResult, _) when PeerHost == {127, 0, 0, 1} ->
{ok, AuthResult#{anonymous => false, auth_result => success}};

Expand All @@ -45,10 +54,11 @@ check(#{username := Username}, AuthResult, _)
{ok, AuthResult#{anonymous => true, auth_result => success}};

%% 当 clientid 和 password 为token 且相等的时候为用户登录
check(#{clientid := <<Token:34/binary, _Type/binary>>, username := UserId, password := Token}, AuthResult, #{hash_type := _HashType}) ->
check(#{clientid := <<Token:34/binary, _Type/binary>> = ClientId, username := UserId, password := Token}, AuthResult, #{hash_type := _HashType}) ->
%% io:format("~s ~p UserId: ~p~n", [?FILE, ?LINE, UserId]),
case dgiot_auth:get_session(Token) of
#{<<"objectId">> := UserId} ->
save_client(ClientId),
{stop, AuthResult#{anonymous => false, auth_result => success}};
_ ->
{stop, AuthResult#{anonymous => false, auth_result => password_error}}
Expand Down
18 changes: 17 additions & 1 deletion apps/dgiot_modbus/src/dgiot_modbusc_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,22 @@
zh => <<"是否刷新物模型"/utf8>>
}
},
<<"is_shard">> => #{
order => 9,
type => enum,
required => true,
default => #{<<"value">> => true, <<"label">> => <<""/utf8>>},
enum => [
#{<<"value">> => true, <<"label">> => <<""/utf8>>},
#{<<"value">> => false, <<"label">> => <<""/utf8>>}
],
title => #{
zh => <<"是否分片"/utf8>>
},
description => #{
zh => <<"是否分片存储"/utf8>>
}
},
<<"ico">> => #{
order => 102,
type => string,
Expand Down Expand Up @@ -191,7 +207,7 @@ init(?TYPE, ChannelId, #{
{FileName, MinAddr, MaxAddr} =
case maps:find(<<"filepath">>, Args) of
{ok, FilePath} ->
{FileName1, MinAddr1, MaxAddr1} = dgiot_product_csv:read_csv(ChannelId, FilePath, Is_refresh),
{FileName1, MinAddr1, MaxAddr1} = dgiot_product_csv:read_csv(ChannelId, FilePath, Is_refresh, maps:get(<<"is_shard">>, Args, true)),
%% modbus_tcp:set_addr(ChannelId, MinAddr1, MaxAddr1),
{FileName1, MinAddr1, MaxAddr1};
_ ->
Expand Down
4 changes: 2 additions & 2 deletions apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{sla
{noreply, Dclient#dclient{child = ChildState#{di => Address, step => Step}}};

handle_info({tcp, Buff}, #dclient{channel = ChannelId,
child = #{freq := Freq, maxaddr := Maxaddr, di := Address, filename := FileName, address := StartAddr, data := OldData, step := Step} = ChildState} = Dclient) ->
child = #{freq := Freq, minaddr := MinAddr, maxaddr := Maxaddr, di := Address, filename := FileName, address := StartAddr, data := OldData, step := Step} = ChildState} = Dclient) ->
%% io:format("~s ~p Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]),
Data = modbus_tcp:parse_frame(Buff),
case Address + Step >= Maxaddr of
true ->
EndData = <<OldData/binary, Data/binary>>,
%% io:format("~s ~p EndData = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(EndData)]),
AllData = modbus_tcp:parse_frame(StartAddr, FileName, EndData),
AllData = modbus_tcp:parse_frame(StartAddr, FileName, EndData, MinAddr),
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), "~p recv data ~ts => ~p", [dgiot_datetime:format("YYYY-MM-DD HH:NN:SS"), unicode:characters_to_list(dgiot_json:encode(AllData)), dgiot_utils:binary_to_hex(EndData)]),
erlang:send_after(Freq * 1000, self(), read),
{noreply, Dclient#dclient{child = ChildState#{di => StartAddr, data => <<>>}}};
Expand Down
19 changes: 13 additions & 6 deletions apps/dgiot_modbus/src/modbus/modbus_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([
init/1,
parse_frame/1,
parse_frame/3,
parse_frame/4,
to_frame/1,
build_req_message/1,
get_addr/4,
Expand Down Expand Up @@ -277,7 +277,7 @@ parse_frame(<<_TransactionId:16, _ProtocolId:16, _Size1:16, _Slaveid:8, _FunCode
%% 0000 0018 0280 0000 0004 2102 0402 0000 0005 0017001C001C0000001802800000000421020402
%% 00000001000200030004000540C0000041000000412000004140000041600000418000004190000041A0000041B0000041C0000041D0000041E000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000063
%% 3F800000400000004080000040C0000041000000412000004140000041600000418000004190000041A0000041B0000041C0000041D0000041E0000041F00000420000004208000000000000000000000000000000000000000000000000000000000000000000000000000000000000426000004268000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000630000000000000000000000000000006B
parse_frame(StartAddr, FileName, Data) ->
parse_frame(StartAddr, FileName, Data, MinAddr) ->
AtomName = dgiot_utils:to_atom(FileName),
Things = ets:match(AtomName, {'$1', ['_', '_', '_', '_', '$2', '_', '_', '_', '$3', '_', '_', '_', '_', '_', '_', '$4' | '_']}),
AllData =
Expand All @@ -290,7 +290,14 @@ parse_frame(StartAddr, FileName, Data) ->
_ ->
{Address, 1}
end,
IntOffset = dgiot_utils:to_int(NewAddress) - (StartAddr + 1),
IntOffset =
case dgiot_utils:to_int(MinAddr) of
0 ->
dgiot_utils:to_int(NewAddress);
_ ->
dgiot_utils:to_int(NewAddress) - (StartAddr + 1)
end,

Thing = #{
<<"identifier">> => NewAddress,
<<"dataSource">> => #{
Expand Down Expand Up @@ -358,7 +365,7 @@ parse_frame(StartAddr, FileName, Data) ->
end, <<>>, CacheAck),

Shard_data = modbus_tcp:shard_data(BinData, Calculated),
case dgiot_data:get({modbus_tcp, dgiot_datetime:now_secs()}) of
case dgiot_data:get({modbus_tcp, Devaddr1, dgiot_datetime:now_secs()}) of
not_find ->
spawn(fun() ->
dgiot_device:save(ProductId1, Devaddr1),
Expand All @@ -367,11 +374,11 @@ parse_frame(StartAddr, FileName, Data) ->
end),
ChannelId = dgiot_parse_id:get_channelid(<<"2">>, <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId1, DeviceId, Shard_data}),
dgiot_data:insert({modbus_tcp, dgiot_datetime:now_secs()}, true);
dgiot_data:insert({modbus_tcp, Devaddr1, dgiot_datetime:now_secs()}, true);
_ ->
pass
end,
Ncc#{Devaddr1 => Ack};
Ncc#{Devaddr1 => #{a => CacheAck, b => Shard_data}};
(_, _, Ncc) ->
Ncc
end, #{}, AllData),
Expand Down
64 changes: 22 additions & 42 deletions apps/dgiot_task/src/dgiot_task.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ start(ChannelId, ProductIds) ->
_ ->
pass
end
end, ets:tab2list(dgiot_pnque)).
end, ets:tab2list(?DGIOT_PNQUE)).

save_client(ChannelId, ClientId) ->
case dgiot_data:get(?DGIOT_TASK, ChannelId) of
Expand Down Expand Up @@ -507,35 +507,33 @@ save_td(ProductId, DevAddr, Ack, _AppData) ->
AllData = dgiot_task:get_calculated(ProductId, DevAddr, Collection, Props),
%% 过滤存储值
Storage = dgiot_task:get_storage(AllData, Props),
case Interval > 0 of
true ->
Keys = dgiot_product:get_keys(ProductId),
AllStorageKey = maps:keys(Storage),
case Keys -- AllStorageKey of
List when length(List) == 0 andalso length(AllStorageKey) =/= 0 ->
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage);
_ ->
save_cache_data(DeviceId, CacheData),
Storage
end;
_ ->
save_cache_data(DeviceId, CacheData),
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage)
end
save_cache_data(DeviceId, CacheData),
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, Interval)
end.

%% 处理数据
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage) ->
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, Interval) ->
%% 告警
NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,
dgiot_mqtt:publish(DeviceId, NotificationTopic, dgiot_json:encode(AllData)),
%% 实时数据
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
%% save td
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage),
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
case dgiot_data:get(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}) of
not_find ->
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, dgiot_datetime:now_ms());
Ts ->
case dgiot_datetime:now_ms() - Ts < (Interval * 1000) of
true ->
pass;
_ ->
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, dgiot_datetime:now_ms()),
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage)
end
end,
dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
Storage.

Expand All @@ -544,40 +542,22 @@ save_cache_data(DeviceId, Data) ->
AtomKey = dgiot_utils:to_atom(K),
Acc#{AtomKey => V}
end, #{}, Data),
dgiot_data:insert(?DGIOT_DATA_CACHE, DeviceId, {NewData, dgiot_datetime:now_secs()}).
dgiot_data:insert(?DGIOT_DATA_CACHE, DeviceId, {NewData, dgiot_datetime:now_ms()}).

merge_cache_data(_DeviceId, NewData, 0) ->
NewData;

merge_cache_data(DeviceId, NewData, -1) ->
merge_cache_data(DeviceId, NewData, _) ->
case dgiot_data:get(?DGIOT_DATA_CACHE, DeviceId) of
not_find ->
NewData;
{OldData, _Ts} ->
{OldData, _} ->
NewOldData =
maps:fold(fun(K, V, Acc) ->
Key = dgiot_utils:to_binary(K),
Acc#{Key => V}
end, #{}, OldData),
dgiot_map:merge(NewOldData, NewData)
end;

merge_cache_data(DeviceId, NewData, Interval) ->
case dgiot_data:get(?DGIOT_DATA_CACHE, DeviceId) of
not_find ->
NewData;
{OldData, Ts} ->
case dgiot_datetime:now_secs() - Ts < Interval of
true ->
NewOldData =
maps:fold(fun(K, V, Acc) ->
Key = dgiot_utils:to_binary(K),
Acc#{Key => V}
end, #{}, OldData),
dgiot_map:merge(NewOldData, NewData);
false ->
NewData
end
end.

save_td_no_match(ProductId, DevAddr, Ack, AppData) ->
Expand All @@ -595,6 +575,6 @@ save_td_no_match(ProductId, DevAddr, Ack, AppData) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
Interval = maps:get(<<"interval">>, AppData, 3),
AllData = merge_cache_data(DeviceId, Storage, Interval),
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage),
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, Interval),
AllData
end.
6 changes: 3 additions & 3 deletions apps/dgiot_task/src/dgiot_task_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ handle_info(read, State) ->
{noreply, send_msg(State)};

%% ACK消息触发进行新的指令发送
handle_info({dclient_ack, Topic, Payload}, #dclient{channel = ChannelId, userdata = Usedata} = State) ->
handle_info({dclient_ack, Topic, Payload}, #dclient{channel = _ChannelId, userdata = Usedata} = State) ->
dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1),
case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] ->
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), ProductId, DevAddr, "~s ~p recv => ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(dgiot_json:encode(Payload))]),
%% dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), ProductId, DevAddr, "~s ~p recv => ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(dgiot_json:encode(Payload))]),
dgiot_task:save_td(ProductId, DevAddr, Payload, #{}),
{noreply, send_msg(State#dclient{userdata = Usedata#device_task{product = ProductId, devaddr = DevAddr}})};
_ ->
Expand Down Expand Up @@ -147,7 +147,7 @@ send_msg(#dclient{channel = ChannelId, clock = #dclock{freq = Freq}, userdata =
Payload = dgiot_json:encode(DataSource#{<<"identifier">> => Identifier1, <<"_dgiotTaskFreq">> => Freq}),
%% io:format("~s ~p DataSource = ~p.~n", [?FILE, ?LINE, DataSource]),
dgiot_mqtt:publish(dgiot_utils:to_binary(ChannelId), Topic, Payload),
dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(Payload)]),
%% dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(Payload)]),
{Count + 1, Acc ++ [DataSource], Acc1 ++ [Identifier1]};
_ ->
{Count, Acc, Acc1}
Expand Down
Loading

0 comments on commit 03e0e99

Please sign in to comment.