Skip to content

Commit

Permalink
fix: update
Browse files Browse the repository at this point in the history
  • Loading branch information
dawnwinterLiu committed May 10, 2024
1 parent 96d85c6 commit d22891c
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 54 deletions.
2 changes: 1 addition & 1 deletion apps/dgiot/src/utils/dgiot_csv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ post_properties(<<"dlink">>, AtomName) ->
<<"control">> => <<"%{d}">>,
<<"iscount">> => <<"0">>,
<<"protocol">> => <<"DLINK">>,
<<"strategy">> => <<"Ö÷¶¯Éϱ¨"/utf8>>,
<<"strategy">> => <<"主动上报"/utf8>>,
<<"collection">> => <<"%{s}">>,
<<"countround">> => <<"all">>,
<<"countstrategy">> => 3,
Expand Down
28 changes: 16 additions & 12 deletions apps/dgiot_device/src/handler/dgiot_tdengine_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,22 @@ do_request(get_echart_deviceid, #{<<"deviceid">> := DeviceId, <<"style">> := Sty

%% TDengine 概要: 获取当前设备最新时序数据卡片
do_request(get_devicecard_deviceid, #{<<"deviceid">> := DeviceId} = Args, #{<<"sessionToken">> := SessionToken} = _Context, _Req) ->
case dgiot_product_tdengine:get_channel(SessionToken) of
{error, Error} ->
{error, Error};
{ok, Channel} ->
%% ?LOG(info,"DeviceId ~p", [DeviceId]),
case dgiot_parsex:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} ->
dgiot_mqtt:subscribe_route_key([<<"$dg/user/realtimecard/", DeviceId/binary, "/#">>], <<"realtimecard">>, SessionToken),
dgiot_device_card:get_device_card(Channel, ProductId, DeviceId, Args);
_ ->
{error, <<"not find device">>}
end
case dgiot_data:get({last_data, DeviceId}) of
not_find ->
case dgiot_product_tdengine:get_channel(SessionToken) of
{error, Error} ->
{error, Error};
{ok, Channel} ->
case dgiot_parsex:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"objectId">> := DeviceId, <<"product">> := #{<<"objectId">> := ProductId}}} ->
dgiot_mqtt:subscribe_route_key([<<"$dg/user/realtimecard/", DeviceId/binary, "/#">>], <<"realtimecard">>, SessionToken),
dgiot_device_card:get_device_card(Channel, ProductId, DeviceId, Args);
_ ->
{error, <<"not find device">>}
end
end;
Data ->
{ok, #{<<"data">> => Data}}
end;

%% TDengine 概要: 获取gps轨迹c
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_modbus/src/dgiot_modbusc_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
order => 9,
type => enum,
required => true,
default => #{<<"value">> => true, <<"label">> => <<""/utf8>>},
default => #{<<"value">> => false, <<"label">> => <<""/utf8>>},
enum => [
#{<<"value">> => true, <<"label">> => <<""/utf8>>},
#{<<"value">> => false, <<"label">> => <<""/utf8>>}
Expand Down
13 changes: 8 additions & 5 deletions apps/dgiot_modbus/src/modbus/modbus_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,19 @@ parse_frame(StartAddr, FileName, Data, MinAddr) ->
end, <<>>, CacheAck),

Shard_data = modbus_tcp:shard_data(BinData, Calculated),
case dgiot_data:get({modbus_tcp, Devaddr1, dgiot_datetime:now_secs()}) of
Now = dgiot_datetime:now_secs(),
case dgiot_data:get({modbus_tcp, Devaddr1, Now}) of
not_find ->
spawn(fun() ->
dgiot_device:save(ProductId1, Devaddr1),
Sql = dgiot_tdengine:format_sql(ProductId1, Devaddr1, [Shard_data]),
Sql = dgiot_tdengine:format_sql(ProductId1, Devaddr1, [Shard_data#{<<"createdat">> => Now * 1000}]),
dgiot_tdengine_adapter:save_sql(ProductId1, Sql)
end),
ChannelId = dgiot_parse_id:get_channelid(<<"2">>, <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId1, DeviceId, Shard_data}),
dgiot_data:insert({modbus_tcp, Devaddr1, dgiot_datetime:now_secs()}, true);
%% ChannelId = dgiot_parse_id:get_channelid(<<"2">>, <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
%% dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId1, DeviceId, Shard_data}),
RealData = dgiot_device_card:get_card(ProductId1, [Shard_data#{<<"createdat">> => Now * 1000}], DeviceId, #{}, dgiot_data:get({shard_storage, ProductId1})),
dgiot_data:insert({last_data, DeviceId}, RealData),
dgiot_data:insert({modbus_tcp, Devaddr1, Now}, true);
_ ->
pass
end,
Expand Down
26 changes: 4 additions & 22 deletions apps/dgiot_task/src/dgiot_task.erl
Original file line number Diff line number Diff line change
Expand Up @@ -512,36 +512,18 @@ save_td(ProductId, DevAddr, Ack, _AppData) ->
end.

%% 处理数据
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, Interval) ->
dealwith_data(ProductId, DevAddr, DeviceId, AllData, Storage, _Interval) ->
%% 告警
NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,
dgiot_mqtt:publish(DeviceId, NotificationTopic, dgiot_json:encode(AllData)),
%% 实时数据
ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>),
dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
%% save td
Now = dgiot_datetime:now_ms(),
case dgiot_data:get(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}) of
not_find ->
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, Now);
OldTs ->
case Now - OldTs < (Interval * 500) of
true ->
pass;
_ ->
case dgiot_data:get({save_td, DeviceId, dgiot_datetime:now_secs()}) of
not_find ->
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
dgiot_data:insert(?DGIOT_DATA_CACHE, {save_cache_time, DeviceId}, Now),
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage),
dgiot_data:insert({save_td, DeviceId, dgiot_datetime:now_secs()}, true);
_ ->
pass
end
end
end,
dgiot_tdengine_adapter:save(ProductId, DevAddr, Storage),
dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
Channel = dgiot_product_channel:get_taskchannel(ProductId),
dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(dgiot_json:encode(Storage))]),
Storage.

save_cache_data(DeviceId, Data) ->
Expand Down
28 changes: 15 additions & 13 deletions apps/dgiot_tdengine/src/utils/dgiot_tdengine_field.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,14 @@ check_value(Value, ProductId, Field) ->
Specs = maps:get(<<"specs">>, DataType, #{}),
Type1 = list_to_binary(string:to_upper(binary_to_list(Type))),
NewValue = get_type_value(Type1, Value, Specs),
case check_validate(NewValue, Specs) of
true ->
NewValue;
false ->
BinNewValue = dgiot_utils:to_binary(NewValue),
throw({error, <<Field/binary, "=", BinNewValue/binary, " is not validate">>})
end
NewValue
%% case check_validate(NewValue, Specs) of
%% true ->
%% NewValue;
%% false ->
%% BinNewValue = dgiot_utils:to_binary(NewValue),
%% throw({error, <<Field/binary, "=", BinNewValue/binary, " is not validate">>})
%% end
end.

check_fields(Data, #{<<"properties">> := Props}) ->
Expand Down Expand Up @@ -150,12 +151,13 @@ check_field(Data, #{<<"identifier">> := Field, <<"dataType">> := #{<<"type">> :=
Value ->
Type1 = list_to_binary(string:to_upper(binary_to_list(Type))),
NewValue = get_type_value(Type1, Value, Specs),
case check_validate(NewValue, Specs) of
true ->
NewValue;
false ->
throw({error, <<Field/binary, " is not validate">>})
end
NewValue
%% case check_validate(NewValue, Specs) of
%% true ->
%% NewValue;
%% false ->
%% throw({error, <<Field/binary, " is not validate">>})
%% end
end;

check_field(_, _) ->
Expand Down
1 change: 1 addition & 0 deletions apps/dgiot_topo/src/dgiot_topo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ get_name(ProductId, K, V) ->
send_realtime_card(ProductId, DeviceId, Payload) ->
Data = dgiot_device_card:get_card(ProductId, [Payload], DeviceId, #{}, dgiot_data:get({shard_storage, ProductId})),
Pubtopic = <<"$dg/user/realtimecard/", DeviceId/binary, "/report">>,
dgiot_data:insert({last_data, DeviceId}, Data),
dgiot_mqtt:publish(self(), Pubtopic, base64:encode(dgiot_json:encode(#{<<"data">> => Data}))).

%% 发送实时组态数据
Expand Down

0 comments on commit d22891c

Please sign in to comment.