protobuf.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package protobuf
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "net/http"
  19. "reflect"
  20. "github.com/gogo/protobuf/proto"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/runtime"
  23. "k8s.io/apimachinery/pkg/runtime/schema"
  24. "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
  25. "k8s.io/apimachinery/pkg/util/framer"
  26. )
  27. var (
  28. // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
  29. // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
  30. // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
  31. // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
  32. //
  33. // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
  34. //
  35. // This encoding scheme is experimental, and is subject to change at any time.
  36. protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
  37. )
  38. type errNotMarshalable struct {
  39. t reflect.Type
  40. }
  41. func (e errNotMarshalable) Error() string {
  42. return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
  43. }
  44. func (e errNotMarshalable) Status() metav1.Status {
  45. return metav1.Status{
  46. Status: metav1.StatusFailure,
  47. Code: http.StatusNotAcceptable,
  48. Reason: metav1.StatusReason("NotAcceptable"),
  49. Message: e.Error(),
  50. }
  51. }
  52. // IsNotMarshalable checks the type of error, returns a boolean true if error is not nil and not marshalable false otherwise
  53. func IsNotMarshalable(err error) bool {
  54. _, ok := err.(errNotMarshalable)
  55. return err != nil && ok
  56. }
  57. // NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
  58. // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
  59. // as-is (any type info passed with the object will be used).
  60. func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer {
  61. return &Serializer{
  62. prefix: protoEncodingPrefix,
  63. creater: creater,
  64. typer: typer,
  65. }
  66. }
  67. // Serializer handles encoding versioned objects into the proper wire form
  68. type Serializer struct {
  69. prefix []byte
  70. creater runtime.ObjectCreater
  71. typer runtime.ObjectTyper
  72. }
  73. var _ runtime.Serializer = &Serializer{}
  74. var _ recognizer.RecognizingDecoder = &Serializer{}
  75. const serializerIdentifier runtime.Identifier = "protobuf"
  76. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  77. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  78. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  79. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  80. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  81. // errors, the method will return the calculated schema kind.
  82. func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  83. prefixLen := len(s.prefix)
  84. switch {
  85. case len(originalData) == 0:
  86. // TODO: treat like decoding {} from JSON with defaulting
  87. return nil, nil, fmt.Errorf("empty data")
  88. case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
  89. return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
  90. case len(originalData) == prefixLen:
  91. // TODO: treat like decoding {} from JSON with defaulting
  92. return nil, nil, fmt.Errorf("empty body")
  93. }
  94. data := originalData[prefixLen:]
  95. unk := runtime.Unknown{}
  96. if err := unk.Unmarshal(data); err != nil {
  97. return nil, nil, err
  98. }
  99. actual := unk.GroupVersionKind()
  100. copyKindDefaults(&actual, gvk)
  101. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  102. *intoUnknown = unk
  103. if ok, _, _ := s.RecognizesData(unk.Raw); ok {
  104. intoUnknown.ContentType = runtime.ContentTypeProtobuf
  105. }
  106. return intoUnknown, &actual, nil
  107. }
  108. if into != nil {
  109. types, _, err := s.typer.ObjectKinds(into)
  110. switch {
  111. case runtime.IsNotRegisteredError(err):
  112. pb, ok := into.(proto.Message)
  113. if !ok {
  114. return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
  115. }
  116. if err := proto.Unmarshal(unk.Raw, pb); err != nil {
  117. return nil, &actual, err
  118. }
  119. return into, &actual, nil
  120. case err != nil:
  121. return nil, &actual, err
  122. default:
  123. copyKindDefaults(&actual, &types[0])
  124. // if the result of defaulting did not set a version or group, ensure that at least group is set
  125. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  126. // of into is set if there is no better information from the caller or object.
  127. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  128. actual.Group = types[0].Group
  129. }
  130. }
  131. }
  132. if len(actual.Kind) == 0 {
  133. return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
  134. }
  135. if len(actual.Version) == 0 {
  136. return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
  137. }
  138. return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
  139. }
  140. // Encode serializes the provided object to the given writer.
  141. func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
  142. if co, ok := obj.(runtime.CacheableObject); ok {
  143. return co.CacheEncode(s.Identifier(), s.doEncode, w)
  144. }
  145. return s.doEncode(obj, w)
  146. }
  147. func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error {
  148. prefixSize := uint64(len(s.prefix))
  149. var unk runtime.Unknown
  150. switch t := obj.(type) {
  151. case *runtime.Unknown:
  152. estimatedSize := prefixSize + uint64(t.Size())
  153. data := make([]byte, estimatedSize)
  154. i, err := t.MarshalTo(data[prefixSize:])
  155. if err != nil {
  156. return err
  157. }
  158. copy(data, s.prefix)
  159. _, err = w.Write(data[:prefixSize+uint64(i)])
  160. return err
  161. default:
  162. kind := obj.GetObjectKind().GroupVersionKind()
  163. unk = runtime.Unknown{
  164. TypeMeta: runtime.TypeMeta{
  165. Kind: kind.Kind,
  166. APIVersion: kind.GroupVersion().String(),
  167. },
  168. }
  169. }
  170. switch t := obj.(type) {
  171. case bufferedMarshaller:
  172. // this path performs a single allocation during write but requires the caller to implement
  173. // the more efficient Size and MarshalToSizedBuffer methods
  174. encodedSize := uint64(t.Size())
  175. estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
  176. data := make([]byte, estimatedSize)
  177. i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
  178. if err != nil {
  179. return err
  180. }
  181. copy(data, s.prefix)
  182. _, err = w.Write(data[:prefixSize+uint64(i)])
  183. return err
  184. case proto.Marshaler:
  185. // this path performs extra allocations
  186. data, err := t.Marshal()
  187. if err != nil {
  188. return err
  189. }
  190. unk.Raw = data
  191. estimatedSize := prefixSize + uint64(unk.Size())
  192. data = make([]byte, estimatedSize)
  193. i, err := unk.MarshalTo(data[prefixSize:])
  194. if err != nil {
  195. return err
  196. }
  197. copy(data, s.prefix)
  198. _, err = w.Write(data[:prefixSize+uint64(i)])
  199. return err
  200. default:
  201. // TODO: marshal with a different content type and serializer (JSON for third party objects)
  202. return errNotMarshalable{reflect.TypeOf(obj)}
  203. }
  204. }
  205. // Identifier implements runtime.Encoder interface.
  206. func (s *Serializer) Identifier() runtime.Identifier {
  207. return serializerIdentifier
  208. }
  209. // RecognizesData implements the RecognizingDecoder interface.
  210. func (s *Serializer) RecognizesData(data []byte) (bool, bool, error) {
  211. return bytes.HasPrefix(data, s.prefix), false, nil
  212. }
  213. // copyKindDefaults defaults dst to the value in src if dst does not have a value set.
  214. func copyKindDefaults(dst, src *schema.GroupVersionKind) {
  215. if src == nil {
  216. return
  217. }
  218. // apply kind and version defaulting from provided default
  219. if len(dst.Kind) == 0 {
  220. dst.Kind = src.Kind
  221. }
  222. if len(dst.Version) == 0 && len(src.Version) > 0 {
  223. dst.Group = src.Group
  224. dst.Version = src.Version
  225. }
  226. }
  227. // bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
  228. // byte buffers by pre-calculating the size of the final buffer needed.
  229. type bufferedMarshaller interface {
  230. proto.Sizer
  231. runtime.ProtobufMarshaller
  232. }
  233. // Like bufferedMarshaller, but is able to marshal backwards, which is more efficient since it doesn't call Size() as frequently.
  234. type bufferedReverseMarshaller interface {
  235. proto.Sizer
  236. runtime.ProtobufReverseMarshaller
  237. }
  238. // estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
  239. // object with a nil RawJSON struct and the expected size of the provided buffer. The
  240. // returned size will not be correct if RawJSOn is set on unk.
  241. func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
  242. size := uint64(unk.Size())
  243. // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
  244. // and the size of the array.
  245. size += 1 + 8 + byteSize
  246. return size
  247. }
  248. // NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
  249. // is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
  250. // encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
  251. //
  252. // This encoding scheme is experimental, and is subject to change at any time.
  253. func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *RawSerializer {
  254. return &RawSerializer{
  255. creater: creater,
  256. typer: typer,
  257. }
  258. }
  259. // RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
  260. // type).
  261. type RawSerializer struct {
  262. creater runtime.ObjectCreater
  263. typer runtime.ObjectTyper
  264. }
  265. var _ runtime.Serializer = &RawSerializer{}
  266. const rawSerializerIdentifier runtime.Identifier = "raw-protobuf"
  267. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  268. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  269. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  270. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  271. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  272. // errors, the method will return the calculated schema kind.
  273. func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  274. if into == nil {
  275. return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
  276. }
  277. if len(originalData) == 0 {
  278. // TODO: treat like decoding {} from JSON with defaulting
  279. return nil, nil, fmt.Errorf("empty data")
  280. }
  281. data := originalData
  282. actual := &schema.GroupVersionKind{}
  283. copyKindDefaults(actual, gvk)
  284. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  285. intoUnknown.Raw = data
  286. intoUnknown.ContentEncoding = ""
  287. intoUnknown.ContentType = runtime.ContentTypeProtobuf
  288. intoUnknown.SetGroupVersionKind(*actual)
  289. return intoUnknown, actual, nil
  290. }
  291. types, _, err := s.typer.ObjectKinds(into)
  292. switch {
  293. case runtime.IsNotRegisteredError(err):
  294. pb, ok := into.(proto.Message)
  295. if !ok {
  296. return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
  297. }
  298. if err := proto.Unmarshal(data, pb); err != nil {
  299. return nil, actual, err
  300. }
  301. return into, actual, nil
  302. case err != nil:
  303. return nil, actual, err
  304. default:
  305. copyKindDefaults(actual, &types[0])
  306. // if the result of defaulting did not set a version or group, ensure that at least group is set
  307. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  308. // of into is set if there is no better information from the caller or object.
  309. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  310. actual.Group = types[0].Group
  311. }
  312. }
  313. if len(actual.Kind) == 0 {
  314. return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
  315. }
  316. if len(actual.Version) == 0 {
  317. return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
  318. }
  319. return unmarshalToObject(s.typer, s.creater, actual, into, data)
  320. }
  321. // unmarshalToObject is the common code between decode in the raw and normal serializer.
  322. func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
  323. // use the target if necessary
  324. obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
  325. if err != nil {
  326. return nil, actual, err
  327. }
  328. pb, ok := obj.(proto.Message)
  329. if !ok {
  330. return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
  331. }
  332. if err := proto.Unmarshal(data, pb); err != nil {
  333. return nil, actual, err
  334. }
  335. if actual != nil {
  336. obj.GetObjectKind().SetGroupVersionKind(*actual)
  337. }
  338. return obj, actual, nil
  339. }
  340. // Encode serializes the provided object to the given writer. Overrides is ignored.
  341. func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
  342. if co, ok := obj.(runtime.CacheableObject); ok {
  343. return co.CacheEncode(s.Identifier(), s.doEncode, w)
  344. }
  345. return s.doEncode(obj, w)
  346. }
  347. func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer) error {
  348. switch t := obj.(type) {
  349. case bufferedReverseMarshaller:
  350. // this path performs a single allocation during write but requires the caller to implement
  351. // the more efficient Size and MarshalToSizedBuffer methods
  352. encodedSize := uint64(t.Size())
  353. data := make([]byte, encodedSize)
  354. n, err := t.MarshalToSizedBuffer(data)
  355. if err != nil {
  356. return err
  357. }
  358. _, err = w.Write(data[:n])
  359. return err
  360. case bufferedMarshaller:
  361. // this path performs a single allocation during write but requires the caller to implement
  362. // the more efficient Size and MarshalTo methods
  363. encodedSize := uint64(t.Size())
  364. data := make([]byte, encodedSize)
  365. n, err := t.MarshalTo(data)
  366. if err != nil {
  367. return err
  368. }
  369. _, err = w.Write(data[:n])
  370. return err
  371. case proto.Marshaler:
  372. // this path performs extra allocations
  373. data, err := t.Marshal()
  374. if err != nil {
  375. return err
  376. }
  377. _, err = w.Write(data)
  378. return err
  379. default:
  380. return errNotMarshalable{reflect.TypeOf(obj)}
  381. }
  382. }
  383. // Identifier implements runtime.Encoder interface.
  384. func (s *RawSerializer) Identifier() runtime.Identifier {
  385. return rawSerializerIdentifier
  386. }
  387. // LengthDelimitedFramer is exported variable of type lengthDelimitedFramer
  388. var LengthDelimitedFramer = lengthDelimitedFramer{}
  389. // Provides length delimited frame reader and writer methods
  390. type lengthDelimitedFramer struct{}
  391. // NewFrameWriter implements stream framing for this serializer
  392. func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
  393. return framer.NewLengthDelimitedFrameWriter(w)
  394. }
  395. // NewFrameReader implements stream framing for this serializer
  396. func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
  397. return framer.NewLengthDelimitedFrameReader(r)
  398. }