-
Notifications
You must be signed in to change notification settings - Fork 1
/
s3handler.h
216 lines (177 loc) · 5.71 KB
/
s3handler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#pragma once
#include "SingleFileStorage.h"
#include "Buckets.h"
#include <condition_variable>
#include <memory>
#include <proxygen/httpserver/HTTPServer.h>
#include <proxygen/httpserver/RequestHandler.h>
#include <proxygen/httpserver/RequestHandlerFactory.h>
#include <proxygen/httpserver/ResponseBuilder.h>
#include <proxygen/lib/http/HTTPHeaders.h>
#include <proxygen/lib/http/ProxygenErrorEnum.h>
#include <vector>
#include <thread>
#include <expat.h>
#include <openssl/evp.h>
class ExpatXmlParser
{
public:
XML_Parser parser = nullptr;
void init() {
parser = XML_ParserCreate("UTF-8");
}
~ExpatXmlParser()
{
if(parser!=nullptr)
XML_ParserFree(parser);
}
};
class EvpMdCtx
{
public:
EVP_MD_CTX* ctx = nullptr;
bool init() {
assert(ctx==nullptr);
ctx = EVP_MD_CTX_new();
if(ctx==nullptr)
return false;
return EVP_DigestInit(ctx, EVP_md5())==1;
}
~EvpMdCtx() {
if(ctx != nullptr)
EVP_MD_CTX_free(ctx);
}
};
int mdb_cmp_s3key(const MDB_val *a, const MDB_val *b);
std::string s3key_common_prefix(const std::string_view key);
size_t s3key_common_prefix_hash(const std::string_view key);
struct KeyInfo
{
std::string key;
int64_t version;
int64_t bucketId;
};
struct KeyInfoView
{
std::string_view key;
int64_t version;
int64_t bucketId;
};
class S3Handler : public proxygen::RequestHandler
{
SingleFileStorage &sfs;
const std::string& root_key;
bool withBucketVersioning;
public:
S3Handler(SingleFileStorage &sfs, const std::string& root_key, const std::string& serverUrl, bool withBucketVersioning) : sfs(sfs), self(this), root_key(root_key), serverUrl(serverUrl), withBucketVersioning(withBucketVersioning) {}
void
onRequest(std::unique_ptr<proxygen::HTTPMessage> headers) noexcept override;
void onBody(std::unique_ptr<folly::IOBuf> body) noexcept override;
void onEOM() noexcept override;
void onUpgrade(proxygen::UpgradeProtocol proto) noexcept override;
void requestComplete() noexcept override;
void onError(proxygen::ProxygenError err) noexcept override;
void onEgressPaused() noexcept override;
void onEgressResumed() noexcept override;
struct MultiPartUploadData
{
enum class ParseState
{
Init,
InRoot,
InPart,
InPartNumber,
InEtag,
InUnknownPartAttr,
Finished,
InvalidPartOrder,
Unknown
};
ParseState parseState = ParseState::Init;
int64_t uploadId;
std::string verId;
struct PartData
{
std::string etag;
int partNumber = 0;
};
std::vector<PartData> parts;
};
struct MultiPartDownloadData
{
struct PartExt
{
int64_t size;
int start;
int len;
};
std::string etag;
int64_t uploadId;
int64_t numParts;
size_t extIdx = std::string::npos;
int64_t currOffset = 0;
PartExt currExt;
std::vector<PartExt> exts;
};
private:
void readFile(folly::EventBase *evb);
void readObject(folly::EventBase *evb, std::shared_ptr<S3Handler> self, int64_t offset);
void onBodyCPU(folly::EventBase *evb, int64_t offs, std::unique_ptr<folly::IOBuf> body);
void listObjects(proxygen::HTTPMessage& headers, const std::string& bucket);
void listObjectsV2(proxygen::HTTPMessage& headers, const std::string& bucket, const int64_t bucketId);
void getCommitObject(proxygen::HTTPMessage& headers);
void getObject(proxygen::HTTPMessage& headers);
void putObject(proxygen::HTTPMessage& headers);
void putObjectPart(proxygen::HTTPMessage& headers, int partNumber, int64_t uploadId, std::string uploadVerId);
void commit(proxygen::HTTPMessage& headers);
void deleteObject(proxygen::HTTPMessage& headers);
void listObjects(folly::EventBase *evb, std::shared_ptr<S3Handler> self, const std::string& continuationToken,
const int maxKeys, const std::optional<std::string>& prefix, const std::optional<std::string>& startAfter, const std::string& delimiter, const int64_t bucket,
const bool listV2, const std::string& bucketName);
void createMultipartUpload(proxygen::HTTPMessage& headers);
void finalizeMultipartUpload();
bool parseMultipartInfo(const std::string& md5sum, int64_t& totalLen);
std::string getEtag(const std::string& md5sum);
int seekMultipartExt(int64_t offset);
int readNextMultipartExt(int64_t offset);
int finalizeMultiPart();
int readMultipartExt(int64_t offset);
void readBodyThread(folly::EventBase *evb);
bool setKeyInfoFromPath(const std::string_view path);
enum class RequestType
{
Unknown,
GetObject,
HeadObject,
PutObject,
DeleteObject,
ListObjects,
CompleteMultipartUpload
};
std::shared_ptr<S3Handler> self;
RequestType request_type = RequestType::Unknown;
KeyInfo keyInfo;
std::atomic<bool> paused_{ false };
int64_t done_bytes = 0;
bool running = false;
bool finished_ = false;
std::atomic<int64_t> put_remaining = -1;
ExpatXmlParser xmlBody;
std::string serverUrl;
std::unique_ptr<MultiPartUploadData> multiPartUploadData;
std::unique_ptr<MultiPartDownloadData> multiPartDownloadData;
std::mutex extents_mutex;
std::condition_variable extents_cond;
std::vector<SingleFileStorage::Ext> extents;
EvpMdCtx evpMdCtx;
struct BodyObj
{
int64_t offset;
std::unique_ptr<folly::IOBuf> body;
bool unpause;
};
std::mutex bodyMutex;
bool hasBodyThread = false;
std::queue<BodyObj> bodyQueue;
Buckets buckets;
};