Source code for designateclient.v2.zones

# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient.v2.base import V2Controller
from designateclient.v2 import utils as v2_utils


[docs] class ZoneController(V2Controller):
[docs] def create(self, name, type_=None, email=None, description=None, ttl=None, masters=None, attributes=None): type_ = type_ or 'PRIMARY' data = { 'name': name, 'type': type_ } if type_ == 'PRIMARY': if email: data['email'] = email if ttl is not None: data['ttl'] = ttl elif type_ == 'SECONDARY' and masters: data['masters'] = masters if description is not None: data['description'] = description if attributes is not None: data['attributes'] = attributes return self._post('/zones', data=data)
[docs] def list(self, criterion=None, marker=None, limit=None): url = self.build_url('/zones', criterion, marker, limit) return self._get(url, response_key='zones')
[docs] def get(self, zone): zone = v2_utils.resolve_by_name(self.list, zone) return self._get(f'/zones/{zone}')
[docs] def update(self, zone, values): zone = v2_utils.resolve_by_name(self.list, zone) url = self.build_url(f'/zones/{zone}') return self._patch(url, data=values)
[docs] def delete(self, zone, delete_shares=False): zone = v2_utils.resolve_by_name(self.list, zone) url = self.build_url(f'/zones/{zone}') if delete_shares: headers = {'X-Designate-Delete-Shares': 'true'} _resp, body = self.client.session.delete(url, headers=headers) else: _resp, body = self.client.session.delete(url) return body
[docs] def abandon(self, zone): zone = v2_utils.resolve_by_name(self.list, zone) url = f'/zones/{zone}/tasks/abandon' self.client.session.post(url)
[docs] def axfr(self, zone): zone = v2_utils.resolve_by_name(self.list, zone) url = f'/zones/{zone}/tasks/xfr' self.client.session.post(url)
[docs] def pool_move(self, zone, values): zone = v2_utils.resolve_by_name(self.list, zone) url = self.build_url('/zones/%s/tasks/pool_move' % zone) return self._post(url, data=values)
[docs] class ZoneTransfersController(V2Controller):
[docs] def create_request(self, zone, target_project_id, description=None): zone = v2_utils.resolve_by_name(self.client.zones.list, zone) data = { 'target_project_id': target_project_id } if description is not None: data['description'] = description url = f'/zones/{zone}/tasks/transfer_requests' return self._post(url, data=data)
[docs] def get_request(self, transfer_id): url = f'/zones/tasks/transfer_requests/{transfer_id}' return self._get(url)
[docs] def list_requests(self): url = '/zones/tasks/transfer_requests' return self._get(url, response_key='transfer_requests')
[docs] def update_request(self, transfer_id, values): url = f'/zones/tasks/transfer_requests/{transfer_id}' return self._patch(url, data=values)
[docs] def delete_request(self, transfer_id): url = f'/zones/tasks/transfer_requests/{transfer_id}' self._delete(url)
[docs] def accept_request(self, transfer_id, key): url = '/zones/tasks/transfer_accepts' data = { 'key': key, 'zone_transfer_request_id': transfer_id } return self._post(url, data=data)
[docs] def get_accept(self, accept_id): url = f'/zones/tasks/transfer_accepts/{accept_id}' return self._get(url)
[docs] def list_accepts(self): url = '/zones/tasks/transfer_accepts' return self._get(url, response_key='transfer_accepts')
[docs] class ZoneExportsController(V2Controller):
[docs] def create(self, zone): zone_id = v2_utils.resolve_by_name(self.client.zones.list, zone) return self._post(f'/zones/{zone_id}/tasks/export')
[docs] def get_export_record(self, zone_export_id): return self._get(f'/zones/tasks/exports/{zone_export_id}')
[docs] def list(self): return self._get('/zones/tasks/exports')
[docs] def delete(self, zone_export_id): return self._delete(f'/zones/tasks/exports/{zone_export_id}')
[docs] def get_export(self, zone_export_id): return self._get(f'/zones/tasks/exports/{zone_export_id}/export', headers={'Accept': 'text/dns'})
[docs] class ZoneImportsController(V2Controller):
[docs] def create(self, zone_file_contents): return self._post('/zones/tasks/imports', data=zone_file_contents, headers={'Content-Type': 'text/dns'})
[docs] def get_import_record(self, zone_import_id): return self._get(f'/zones/tasks/imports/{zone_import_id}')
[docs] def list(self): return self._get('/zones/tasks/imports')
[docs] def delete(self, zone_import_id): return self._delete(f'/zones/tasks/imports/{zone_import_id}')
[docs] class ZoneShareController(V2Controller):
[docs] def create(self, zone, target_project_id): zone_id = v2_utils.resolve_by_name(self.client.zones.list, zone) data = {'target_project_id': target_project_id} return self._post(f'/zones/{zone_id}/shares', data=data)
[docs] def list(self, zone, criterion=None, marker=None, limit=None): zone_id = v2_utils.resolve_by_name(self.client.zones.list, zone) url = self.build_url(f'/zones/{zone_id}/shares', criterion, marker, limit) return self._get(url, response_key='shared_zones')
[docs] def delete(self, zone, shared_zone_id): zone_id = v2_utils.resolve_by_name(self.client.zones.list, zone) return self._delete(f'/zones/{zone_id}/shares/{shared_zone_id}')
[docs] def get(self, zone, shared_zone_id): zone_id = v2_utils.resolve_by_name(self.client.zones.list, zone) return self._get(f'/zones/{zone_id}/shares/{shared_zone_id}')