Skip to content

Commit

Permalink
go extension filter state get api (envoyproxy#27864)
Browse files Browse the repository at this point in the history
* go extension filter state get api

Signed-off-by: wangkai19 <[email protected]>

* fix some review comments

Signed-off-by: wangkai19 <[email protected]>

* fix review comments

Signed-off-by: wangkai19 <[email protected]>

* remove unnecessary lock

Signed-off-by: wangkai19 <[email protected]>

---------

Signed-off-by: wangkai19 <[email protected]>
  • Loading branch information
StarryVae committed Jun 13, 2023
1 parent 158b726 commit ee3a545
Show file tree
Hide file tree
Showing 18 changed files with 136 additions and 12 deletions.
7 changes: 7 additions & 0 deletions contrib/golang/common/dso/dso.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ HttpFilterDsoImpl::HttpFilterDsoImpl(const std::string dso_name) : HttpFilterDso
envoy_go_filter_on_http_data_, handler_, dso_name, "envoyGoFilterOnHttpData");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_on_http_destroy_)>(
envoy_go_filter_on_http_destroy_, handler_, dso_name, "envoyGoFilterOnHttpDestroy");
loaded_ &= dlsymInternal<decltype(envoy_go_filter_go_request_sema_dec_)>(
envoy_go_filter_go_request_sema_dec_, handler_, dso_name, "envoyGoRequestSemaDec");
}

GoUint64 HttpFilterDsoImpl::envoyGoFilterNewHttpPluginConfig(GoUint64 p0, GoUint64 p1, GoUint64 p2,
Expand Down Expand Up @@ -87,6 +89,11 @@ void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) {
envoy_go_filter_on_http_destroy_(p0, GoUint64(p1));
}

void HttpFilterDsoImpl::envoyGoRequestSemaDec(httpRequest* p0) {
ASSERT(envoy_go_filter_go_request_sema_dec_ != nullptr);
envoy_go_filter_go_request_sema_dec_(p0);
}

ClusterSpecifierDsoImpl::ClusterSpecifierDsoImpl(const std::string dso_name)
: ClusterSpecifierDso(dso_name) {
loaded_ &= dlsymInternal<decltype(envoy_go_cluster_specifier_new_plugin_)>(
Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/common/dso/dso.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class HttpFilterDso : public Dso {
virtual GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE;
};

class HttpFilterDsoImpl : public HttpFilterDso {
Expand All @@ -54,6 +55,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 p3) override;
GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;

private:
GoUint64 (*envoy_go_filter_new_http_plugin_config_)(GoUint64 p0, GoUint64 p1, GoUint64 p2,
Expand All @@ -65,6 +67,7 @@ class HttpFilterDsoImpl : public HttpFilterDso {
GoUint64 (*envoy_go_filter_on_http_data_)(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr};
};

class ClusterSpecifierDso : public Dso {
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/libgolang.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ extern GoUint64 envoyGoFilterOnHttpData(httpRequest* r,
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy
extern void envoyGoFilterOnHttpDestroy(httpRequest* r, GoUint64 reason);

// go:linkname envoyGoRequestSemaDec
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoRequestSemaDec
extern void envoyGoRequestSemaDec(httpRequest* r);

// go:linkname envoyGoOnClusterSpecify
// github.com/envoyproxy/envoy/contrib/golang/router/cluster_specifier/source/go/pkg/cluster_specifier.envoyGoOnClusterSpecify
extern GoInt64 envoyGoOnClusterSpecify(GoUint64 pluginPtr, // NOLINT(readability-identifier-naming)
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/dso/test/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class MockHttpFilterDsoImpl : public HttpFilterDso {
MOCK_METHOD(GoUint64, envoyGoFilterOnHttpData,
(httpRequest * p0, GoUint64 p1, GoUint64 p2, GoUint64 p3));
MOCK_METHOD(void, envoyGoFilterOnHttpDestroy, (httpRequest * p0, int p1));
MOCK_METHOD(void, envoyGoRequestSemaDec, (httpRequest * p0));
};

} // namespace Dso
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/common/dso/test/test_data/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,9 @@ func envoyGoOnClusterSpecify(pluginPtr uint64, headerPtr uint64, pluginId uint64
return 0
}

//export envoyGoRequestSemaDec
func envoyGoRequestSemaDec(r *C.httpRequest) {
}

func main() {
}
9 changes: 9 additions & 0 deletions contrib/golang/filters/http/source/cgo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ CAPIStatus envoyGoFilterHttpSetStringFilterState(void* r, void* key, void* value
});
}

CAPIStatus envoyGoFilterHttpGetStringFilterState(void* r, void* key, void* value) {
return envoyGoFilterHandlerWrapper(r,
[key, value](std::shared_ptr<Filter>& filter) -> CAPIStatus {
auto key_str = referGoString(key);
auto value_str = reinterpret_cast<GoString*>(value);
return filter->getStringFilterState(key_str, value_str);
});
}

#ifdef __cplusplus
}
#endif
Expand Down
1 change: 0 additions & 1 deletion contrib/golang/filters/http/source/go/pkg/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ licenses(["notice"]) # Apache 2
go_library(
name = "api",
srcs = [
"capi.go",
"cgocheck.go",
"filter.go",
"type.go",
Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ typedef enum { // NOLINT(modernize-use-using)
CAPINotInGo = -3,
CAPIInvalidPhase = -4,
CAPIValueNotFound = -5,
CAPIYield = -6,
} CAPIStatus;

CAPIStatus envoyGoFilterHttpContinue(void* r, int status);
Expand Down Expand Up @@ -71,6 +72,7 @@ void envoyGoFilterHttpFinalize(void* r, int reason);

CAPIStatus envoyGoFilterHttpSetStringFilterState(void* r, void* key, void* value, int state_type,
int life_span, int stream_sharing);
CAPIStatus envoyGoFilterHttpGetStringFilterState(void* r, void* key, void* value);

#ifdef __cplusplus
} // extern "C"
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/go/pkg/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ const (

type FilterState interface {
SetString(key, value string, stateType StateType, lifeSpan LifeSpan, streamSharing StreamSharing)
GetString(key string) string
}
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/go/pkg/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
name = "http",
srcs = [
"api.h",
"capi.go",
"capi_impl.go",
"config.go",
"filter.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
* limitations under the License.
*/

package api
package http

import "unsafe"
import (
"unsafe"

"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/api"
)

type HttpCAPI interface {
HttpContinue(r unsafe.Pointer, status uint64)
Expand All @@ -33,7 +37,7 @@ type HttpCAPI interface {
HttpRemoveHeader(r unsafe.Pointer, key *string)

HttpGetBuffer(r unsafe.Pointer, bufferPtr uint64, value *string, length uint64)
HttpSetBufferHelper(r unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
HttpSetBufferHelper(r unsafe.Pointer, bufferPtr uint64, value string, action api.BufferAction)

HttpCopyTrailers(r unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetTrailer(r unsafe.Pointer, key *string, value *string, add bool)
Expand All @@ -45,10 +49,11 @@ type HttpCAPI interface {
// TODO: HttpGetDynamicMetadata(r unsafe.Pointer, filterName string) map[string]interface{}
HttpSetDynamicMetadata(r unsafe.Pointer, filterName string, key string, value interface{})

HttpLog(level LogType, message string)
HttpLogLevel() LogType
HttpLog(level api.LogType, message string)
HttpLogLevel() api.LogType

HttpFinalize(r unsafe.Pointer, reason int)

HttpSetStringFilterState(r unsafe.Pointer, key string, value string, stateType StateType, lifeSpan LifeSpan, streamSharing StreamSharing)
HttpSetStringFilterState(r unsafe.Pointer, key string, value string, stateType api.StateType, lifeSpan api.LifeSpan, streamSharing api.StreamSharing)
HttpGetStringFilterState(r *httpRequest, key string) string
}
20 changes: 18 additions & 2 deletions contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"reflect"
"runtime"
"strings"
"sync/atomic"
"unsafe"

"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/api"
Expand Down Expand Up @@ -263,14 +264,29 @@ func (c *httpCApiImpl) HttpFinalize(r unsafe.Pointer, reason int) {
C.envoyGoFilterHttpFinalize(r, C.int(reason))
}

var cAPI api.HttpCAPI = &httpCApiImpl{}
var cAPI HttpCAPI = &httpCApiImpl{}

// SetHttpCAPI for mock cAPI
func SetHttpCAPI(api api.HttpCAPI) {
func SetHttpCAPI(api HttpCAPI) {
cAPI = api
}

func (c *httpCApiImpl) HttpSetStringFilterState(r unsafe.Pointer, key string, value string, stateType api.StateType, lifeSpan api.LifeSpan, streamSharing api.StreamSharing) {
res := C.envoyGoFilterHttpSetStringFilterState(r, unsafe.Pointer(&key), unsafe.Pointer(&value), C.int(stateType), C.int(lifeSpan), C.int(streamSharing))
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpGetStringFilterState(r *httpRequest, key string) string {
var value string
r.sema.Add(1)
res := C.envoyGoFilterHttpGetStringFilterState(unsafe.Pointer(r.req), unsafe.Pointer(&key), unsafe.Pointer(&value))
if res == C.CAPIYield {
atomic.AddInt32(&r.waitingOnEnvoy, 1)
r.sema.Wait()
} else {
r.sema.Done()
handleCApiStatus(res)
}

return strings.Clone(value)
}
13 changes: 10 additions & 3 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ package http
import "C"
import (
"fmt"
"sync"
"unsafe"

"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/api"
Expand All @@ -57,9 +58,11 @@ type panicInfo struct {
details string
}
type httpRequest struct {
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
req *C.httpRequest
httpFilter api.StreamFilter
pInfo panicInfo
sema sync.WaitGroup
waitingOnEnvoy int32
}

func (r *httpRequest) pluginName() string {
Expand Down Expand Up @@ -214,3 +217,7 @@ func (s *streamInfo) FilterState() api.FilterState {
func (f *filterState) SetString(key, value string, stateType api.StateType, lifeSpan api.LifeSpan, streamSharing api.StreamSharing) {
cAPI.HttpSetStringFilterState(unsafe.Pointer(f.request.req), key, value, stateType, lifeSpan, streamSharing)
}

func (f *filterState) GetString(key string) string {
return cAPI.HttpGetStringFilterState(f.request, key)
}
13 changes: 13 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"

"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/api"
)
Expand Down Expand Up @@ -208,6 +209,9 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {
req := getRequest(r)
// do nothing even when req.panic is true, since filter is already destroying.
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}

v := api.DestroyReason(reason)

Expand All @@ -216,3 +220,12 @@ func envoyGoFilterOnHttpDestroy(r *C.httpRequest, reason uint64) {

Requests.DeleteReq(r)
}

//export envoyGoRequestSemaDec
func envoyGoRequestSemaDec(r *C.httpRequest) {
req := getRequest(r)
defer req.RecoverPanic()
if atomic.CompareAndSwapInt32(&req.waitingOnEnvoy, 1, 0) {
req.sema.Done()
}
}
44 changes: 44 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,50 @@ CAPIStatus Filter::setStringFilterState(absl::string_view key, absl::string_view
return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::getStringFilterState(absl::string_view key, GoString* value_str) {
// lock until this function return since it may running in a Go thread.
Thread::LockGuard lock(mutex_);
if (has_destroyed_) {
ENVOY_LOG(debug, "golang filter has been destroyed");
return CAPIStatus::CAPIFilterIsDestroy;
}

auto& state = getProcessorState();
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang filter is not processing Go");
return CAPIStatus::CAPINotInGo;
}

if (state.isThreadSafe()) {
auto go_filter_state =
state.streamInfo().filterState()->getDataReadOnly<GoStringFilterState>(key);
if (go_filter_state) {
req_->strValue = go_filter_state->value();
value_str->p = req_->strValue.data();
value_str->n = req_->strValue.length();
}
} else {
auto key_str = std::string(key);
auto weak_ptr = weak_from_this();
state.getDispatcher().post([this, &state, weak_ptr, key_str, value_str] {
if (!weak_ptr.expired() && !hasDestroyed()) {
auto go_filter_state =
state.streamInfo().filterState()->getDataReadOnly<GoStringFilterState>(key_str);
if (go_filter_state) {
req_->strValue = go_filter_state->value();
value_str->p = req_->strValue.data();
value_str->n = req_->strValue.length();
}
dynamic_lib_->envoyGoRequestSemaDec(req_);
} else {
ENVOY_LOG(info, "golang filter has gone or destroyed in setStringFilterState");
}
});
return CAPIStatus::CAPIYield;
}
return CAPIStatus::CAPIOK;
}

/* ConfigId */

uint64_t Filter::getMergedConfigId(ProcessorState& state) {
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Filter : public Http::StreamFilter,
CAPIStatus setDynamicMetadata(std::string filter_name, std::string key, absl::string_view buf);
CAPIStatus setStringFilterState(absl::string_view key, absl::string_view value, int state_type,
int life_span, int stream_sharing);
CAPIStatus getStringFilterState(absl::string_view key, GoString* value_str);

private:
bool hasDestroyed() {
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ name: golang
codec_client_->sendTrailers(request_encoder, request_trailers);

waitForNextUpstreamRequest();

EXPECT_EQ("go_state_test_value",
getHeader(upstream_request_->headers(), "go-state-test-header-key"));

// original header: x-test-header-0
EXPECT_EQ("foo", getHeader(upstream_request_->headers(), "x-test-header-0"));

Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/filters/http/test/test_data/basic/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func (f *filter) decodeHeaders(header api.RequestHeaderMap, endStream bool) api.
fs := f.callbacks.StreamInfo().FilterState()
fs.SetString("go_state_test_key", "go_state_test_value", api.StateTypeReadOnly, api.LifeSpanRequest, api.SharedWithUpstreamConnection)

val := fs.GetString("go_state_test_key")
header.Add("go-state-test-header-key", val)

if strings.Contains(f.localreplay, "decode-header") {
return f.sendLocalReply("decode-header")
}
Expand Down

0 comments on commit ee3a545

Please sign in to comment.