Skip to content

Commit

Permalink
feat: task,import wmx
Browse files Browse the repository at this point in the history
  • Loading branch information
dawnwinterLiu committed Apr 28, 2024
1 parent b8e39c0 commit 96d85c6
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 31 deletions.
52 changes: 48 additions & 4 deletions apps/dgiot/src/utils/dgiot_csv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
-include("dgiot.hrl").
-include_lib("dgiot/include/logger.hrl").
-export([
read_from_csv/2
read_from_csv/2
, save_csv_ets/2
, read_csv/3
, save_csv_ets/1
, post_properties/1
, post_properties/2
]).

read_from_csv(Path, Fun) ->
Expand Down Expand Up @@ -97,7 +97,8 @@ save_csv_ets(#{<<"fullpath">> := Fullpath}) ->
AtomName.


post_properties(Things) ->
post_properties(<<"plc">>, AtomName) ->
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}),
lists:foldl(fun([Index, Devicetype, Name, Identifier, Address, Originaltype, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) ->
Acc#{
to_lower(Identifier) => #{
Expand Down Expand Up @@ -133,7 +134,50 @@ post_properties(Things) ->
<<"moduleType">> => <<"properties">>,
<<"isaccumulate">> => false
}}
end, #{}, Things).
end, #{}, Things);

post_properties(<<"dlink">>, AtomName) ->
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}),
lists:foldl(fun([Index, Devicetype, Name, Identifier, Key, Len, AccessMode, Min_Max, Unit, Type, Specs | _], Acc) ->
Acc#{
to_lower(Identifier) => #{
<<"name">> => Name,
<<"index">> => Index,
<<"isstorage">> => true,
<<"isshow">> => true,
<<"dataForm">> => #{
<<"address">> => <<"0">>,
<<"rate">> => 1,
<<"order">> => 0,
<<"round">> => <<"all">>,
<<"offset">> => 0,
<<"control">> => <<"%{d}">>,
<<"iscount">> => <<"0">>,
<<"protocol">> => <<"DLINK">>,
<<"strategy">> => <<"Ö÷¶¯Éϱ¨"/utf8>>,
<<"collection">> => <<"%{s}">>,
<<"countround">> => <<"all">>,
<<"countstrategy">> => 3,
<<"countcollection">> => <<"%{s}">>
},
<<"dataType">> => get_dataType(to_lower(Type), Min_Max, Unit, Specs),
<<"required">> => true,
<<"accessMode">> => get_accessmode(AccessMode),
<<"dataSource">> => #{
<<"_dlinkindex">> => <<"1">>,
<<"dis">> => [
#{<<"key">> => Key, <<"data">> => Len}
]
},
<<"devicetype">> => Devicetype,
<<"identifier">> => to_lower(Identifier),
<<"moduleType">> => <<"properties">>,
<<"isaccumulate">> => false
}}
end, #{}, Things);

post_properties(_, _) ->
error.

get_accessmode(<<229, 143, 170, 232, 175, 187>>) ->
<<"r">>;
Expand Down
40 changes: 22 additions & 18 deletions apps/dgiot_api/src/handler/dgiot_data_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,30 @@ do_request(post_export_data, #{<<"classname">> := Name} = Body, #{<<"sessionToke
end;

%% 导入物模型
do_request(post_import_wmxdata, #{<<"objectId">> := ProductId, <<"file">> := File} = _Args, _Context, _Req) ->
do_request(post_import_wmxdata, #{<<"type">> := Type, <<"objectId">> := ProductId, <<"file">> := File} = _Args, _Context, _Req) ->
%% io:format("~s ~p Args =~p.~n", [?FILE, ?LINE, Args]),
AtomName = dgiot_csv:save_csv_ets(File),
Things = ets:match(AtomName, {'$1', ['$2', '$3', '$4', '$5', '$6', '$7', '$8', '$9', '$10', '$11' | '_']}),
NewProperties = dgiot_csv:post_properties(Things),
case dgiot_parsex:get_object(<<"Product">>, ProductId) of
{ok, #{<<"thing">> := Thing}} ->
OldProperties =
lists:foldl(fun(#{<<"identifier">> := Identifier} = X, Acc) ->
Acc#{Identifier => X}
end, #{}, maps:get(<<"properties">>, Thing, [])),
Properties =
maps:fold(fun(_, Prop, Acc) ->
Acc ++ [Prop]
end, [], dgiot_map:merge(OldProperties, NewProperties)),
dgiot_parsex:update_object(<<"Product">>, ProductId, #{<<"thing">> => Thing#{<<"properties">> => Properties}});
_ ->
pass
end,
{ok, #{<<"code">> => 200, <<"msg">> => <<"success">>}};
case dgiot_csv:post_properties(Type, AtomName) of
error ->
{ok, #{<<"code">> => 500, <<"msg">> => <<"error">>}};
NewProperties ->
case dgiot_parsex:get_object(<<"Product">>, ProductId) of
{ok, #{<<"thing">> := Thing}} ->
OldProperties =
lists:foldl(fun(#{<<"identifier">> := Identifier} = X, Acc) ->
Acc#{Identifier => X}
end, #{}, maps:get(<<"properties">>, Thing, [])),
Properties =
maps:fold(fun(_, Prop, Acc) ->
Acc ++ [Prop]
end, [], dgiot_map:merge(OldProperties, NewProperties)),
dgiot_parsex:update_object(<<"Product">>, ProductId, #{<<"thing">> => Thing#{<<"properties">> => Properties}});
_ ->
pass
end,
{ok, #{<<"code">> => 200, <<"msg">> => <<"success">>}}
end;


%% DB 概要: 导库 描述:json文件导库
%% OperationId:post_import_data
Expand Down
21 changes: 14 additions & 7 deletions apps/dgiot_task/src/dgiot_task.erl
Original file line number Diff line number Diff line change
Expand Up @@ -520,18 +520,25 @@ dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, Interval) ->
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
%% save td
Now = dgiot_datetime:now_ms(),
case dgiot_data:get(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}) of
not_find ->
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, dgiot_datetime:now_ms());
Ts ->
case dgiot_datetime:now_ms() - Ts < (Interval * 1000) of
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, Now);
OldTs ->
case Now - OldTs < (Interval * 500) of
true ->
pass;
_ ->
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, dgiot_datetime:now_ms()),
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage)
case dgiot_data:get({save_td, DeviceId, dgiot_datetime:now_secs()}) of
not_find ->
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, Now),
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage),
dgiot_data:insert({save_td, DeviceId, dgiot_datetime:now_secs()}, true);
_ ->
pass
end
end
end,
dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_tdengine/src/dgiot_tdengine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ batch(Channel, Batch) ->
batch_sql(Channel, Sql) ->
transaction(Channel,
fun(Context) ->
dgiot_tdengine_pool:insert_sql(Context#{<<"channel">> => Channel}, execute_update, Sql)
dgiot_tdengine_pool:run_sql(Context#{<<"channel">> => Channel}, execute_update, Sql)
end).

create_user(UserName, Password) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
-include("dgiot_tdengine.hrl").
-include_lib("dgiot/include/logger.hrl").

-export([add_field/5, get_field/1, check_fields/2, check_fields/3, get_time/2, check_value/3, get_field_type/1]).
-export([add_field/5, get_field/1, check_fields/2, check_fields/3, get_time/2, check_value/3, get_field_type/1, check_validate/2]).

add_field(#{<<"type">> := <<"enum">>}, Database, TableName, LowerIdentifier, FieldType) ->
<<"ALTER TABLE ", Database/binary, TableName/binary, " ADD ", FieldType/binary, " ", LowerIdentifier/binary, " INT;">>;
Expand Down

0 comments on commit 96d85c6

Please sign in to comment.