Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcs-pubsub pr #1658

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open

gcs-pubsub pr #1658

wants to merge 9 commits into from

Conversation

mhite
Copy link
Contributor

@mhite mhite commented Jan 6, 2023

A work in progress MVP for #1034

@mhite mhite requested a review from Jeffail as a code owner January 6, 2023 00:10
@mhite
Copy link
Contributor Author

mhite commented Jan 6, 2023

Pardon my mediocre diagram, but this is the scenario I am using to test an MVP. Read it from the bottom up, wherein logs are generated and flushed periodically as NLD-JSON files to a GCS bucket. This triggers a bucket event notification on to a Pub/Sub topic and then dropped on a subscription Benthos watches. Benthos then reads event notification information from the message to determine the bucket and object name and the input then proceeds to download the referenced file.

gcs-notification

See this repository for Pulumi code which builds the aforementioned scenario:

https://github.com/mhite/benthos-gcp-dev/tree/main/input_cloud_storage

@loicalleyne
Copy link
Contributor

Thank you for working on this, I have exactly this use case and was thinking about an implementation, glad you got to it first.

@loicalleyne
Copy link
Contributor

loicalleyne commented Jan 6, 2023

@mhite is this already functional? What parts are still in progress, is it only the TODO items in the comments?
For the GSFUSE 0-byte object thing IMO it's probably ok to just immediately acknowledge the pubsub message if the pubsub message attribute size == "0" and move on.

I'm testing on a local fork on my machine and I think you can do the above like this:

func (ps *pubsubTargetReader) parseObjectPath(pubsubMsgAttributes map[string]string) (*gcpCloudStorageObjectTarget, error) {
	objectSize, ok := pubsubMsgAttributes["size"]
	if !ok {
		return nil, errors.New("pub/sub message missing size attribute")
	}
	if objectSize == "0" {
		return nil, errors.New("object size is 0 bytes")
	}

@mhite
Copy link
Contributor Author

mhite commented Jan 6, 2023

@mhite is this already functional? What parts are still in progress, is it only the TODO items in the comments?

Yes, I've noted stuff that I don't understand/want to discuss/need to fix in the TODO items.

It does run and "works" but is not merge worthy yet due to some of the outstanding issues I've noted.

For the GSFUSE 0-byte object thing IMO it's probably ok to just immediately acknowledge the pubsub message if the pubsub message attribute size == "0" and move on.

I'm testing on a local fork on my machine and I think you can do the above like this:

func (ps *pubsubTargetReader) parseObjectPath(pubsubMsgAttributes map[string]string) (*gcpCloudStorageObjectTarget, error) {
	objectSize, ok := pubsubMsgAttributes["size"]
	if !ok {
		return nil, errors.New("pub/sub message missing size attribute")
	}
	if objectSize == "0" {
		return nil, errors.New("object size is 0 bytes")
	}

Yes, would take a similar approach, although you won't find size in pubsubMsgAttributes but rather in the message body as a JSON key. Also, if it was a notification for a 0 byte file, I think we'd probably also want to "ack" the message to delete it so we don't get it redelivered to us.

@loicalleyne
Copy link
Contributor

Maybe it makes sense to check for size == "0" even before checking the event type?

What do you think about a config option to point to a cache resource to de-duplicate on gcs_key? At work we built an event-based log ingestion pipeline and one of the pain points was handling when a production system uploads the same log file twice. I think it would make sense to de-duplicate based on the pubsub message content before the object is even retrieved.

@mhite
Copy link
Contributor Author

mhite commented Jan 7, 2023

Maybe it makes sense to check for size == "0" even before checking the event type?

Well, the eventType attribute of type OBJECT_FINALIZE in attributes/metadata gives us enough information about the Pub/Sub message to know it is a valid notification message and therefore we know the format/schema of the message body. In other words, we now know with some certainty that the message will contain a JSON payload with a key of size. So my thinking is that the check goes right after we know it is an OBJECT_FINALIZE event (ie. basically where to TODO is right now).

What do you think about a config option to point to a cache resource to de-duplicate on gcs_key? At work we built an event-based log ingestion pipeline and one of the pain points was handling when a production system uploads the same log file twice. I think it would make sense to de-duplicate based on the pubsub message content before the object is even retrieved.

Oh, that's an interesting idea. But it should probably be captured in a different issue/PR for tracking / simplicity sake.

@mhite
Copy link
Contributor Author

mhite commented Jan 7, 2023

I think I have an approach that addresses the problematic gcsfuse behavior[1]. By checking the object generation before download, we can avoid the race condition of potentially consuming the non-zero byte version of a file twice. Additionally, we will refuse to process 0 byte files [if the user has json object payload enabled for the notification config].

I'll do a bit of testing this weekend and update the PR.

[1] - https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/docs/semantics.md#pubsub-notifications-on-file-creation

@mhite
Copy link
Contributor Author

mhite commented Jan 16, 2023

@loicalleyne - want to give this latest version a test?

@loicalleyne
Copy link
Contributor

@mhite ran a short smoke test, everything appeared to work ok.

@loicalleyne
Copy link
Contributor

@mhite is this PR still a WIP?

@mhite
Copy link
Contributor Author

mhite commented Feb 3, 2023

@loicalleyne - Should be ready for a review. I will update PR title.

At some point I want to add functionality that allows a user to choose (by configuration) to acknowledge (ie. delete) bad/unsupported Pub/Sub messages. For now, you very much need to have a dead letter queue setup.

@mhite mhite changed the title wip for gcs-pubsub pr gcs-pubsub pr Feb 3, 2023
@mihaitodor
Copy link
Collaborator

This looks like a good start. I did a bit of search and it looks like there’s an official PubSub emulator that can be run via gcloud to add integration tests. Annoyingly, in Google fashion, they don’t offer a Docker container for this, but I see there’s a maintained 3rd party one over at https://hub.docker.com/r/thekevjames/gcloud-pubsub-emulator. I’ll defer to someone else for a proper review, since I’m not very familiar with PubSub

@mhite
Copy link
Contributor Author

mhite commented Jun 18, 2023

@mihaitodor - There is unfortunately no official Cloud Storage emulator which also makes things tricky. I did come across this, though - https://github.com/fsouza/fake-gcs-server

I think it might also support publishing cloud storage trigger notifications to Pub/Sub, so perhaps it could all be wired up for something simulating end-to-end environment.

@mihaitodor
Copy link
Collaborator

@mhite Yeah, Benthos is using that 3rd party one already. It has worked OK and the maintainer is friendly if anything new needs to be added to it

@loicalleyne
Copy link
Contributor

@mhite did you ever make this as a plugin?

@mhite
Copy link
Contributor Author

mhite commented Sep 2, 2023

@loicalleyne - The PR should work but needs merge conflicts resolved along with integration tests authored.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants