Consolidater

Bases: Client

Source code in geocube/consolidater.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class Consolidater(Client):

    def list_jobs(self, name_like: str = "", page=0, limit=10):
        """
        List jobs by name
        name_like: pattern of the name. * and ? are supported to match all or any character.
        """
        return self._list_jobs(name_like, page, limit)

    def job(self, name: str):
        """ Get job by name. Shortcut for ListJobs(name)[0]. Only few logs are loaded. """
        return self._job(name)

    def get_job(self, job_id: Union[str, entities.Job], log_page=0, log_limit=1000):
        """
        Get job by id.
        Logs are loaded by pages, because some big jobs have too many logs to fit in a gRPC response.
        """
        return self._get_job(job_id, log_page, log_limit)

    @staticmethod
    def wait_job(job: entities.Job, wait_secs=15, timeout_secs=None, verbose=True):
        """
        Wait for the job to finish or fail.
        If the execution level is step-by-step, it will automatically continue.
        If verbose=True, the last log is printed every time a state change is detected.
        """
        prev_state = job.state
        while job.state not in ['DONE', 'FAILED', 'DONEBUTUNTIDY']:
            time.sleep(wait_secs)
            job.refresh(log_limit=1 if verbose else 0)
            if job.state != prev_state:
                prev_state = job.state
                if verbose:
                    print(job.logs[-1])
            if job.waiting:
                job.next()
            if timeout_secs is not None:
                timeout_secs -= wait_secs
                if timeout_secs < 0:
                    raise TimeoutError(f"job {job.name}: state={job.state}")

    def remove_terminated_jobs(self, name_like: str = "", state: str = ""):
        """
        Remove all the jobs from the Geocube given a name pattern (by default, all terminated jobs)
        name_like: pattern of the name. * and ? are supported to match all or any character.
        state: state of the jobs to be removed.
        """
        return self._remove_terminated_jobs(name_like, state)

    def consolidate(self,
                    job_name: str,
                    instance: Union[str, entities.VariableInstance],
                    layout: Union[str, entities.Layout],
                    *,
                    records: Union[List[entities.RecordIdentifiers], None] = None,
                    tags: Union[Dict[str, str], None] = None,
                    from_time: Union[datetime, None] = None,
                    to_time: Union[datetime, None] = None,
                    collapse_on_record: Union[entities.Record, str, None] = None,
                    execution_level: entities.ExecutionLevel = entities.ExecutionLevel.ASYNCHRONOUS):
        return self._consolidate(job_name, instance, layout, records, tags, from_time, to_time,
                                 collapse_on_record, execution_level)

    @utils.catch_rpc_error
    def _list_jobs(self, name_like: str, page: int, limit: int):
        res = self.stub.ListJobs(operations_pb2.ListJobsRequest(name_like=name_like, page=page, limit=limit))
        return [entities.Job.from_pb(self.stub, r) for r in res.jobs]

    @utils.catch_rpc_error
    def _job(self, name: str):
        res = self.stub.ListJobs(operations_pb2.ListJobsRequest(name_like=name))
        if len(res.jobs) == 0:
            raise utils.GeocubeError("job", "NOT_FOUND", "with name: " + name)
        return entities.Job.from_pb(self.stub, res.jobs[0])

    @utils.catch_rpc_error
    def _get_job(self, job_id: Union[str, entities.Job], log_page, log_limit):
        res = self.stub.GetJob(operations_pb2.GetJobRequest(id=entities.get_id(job_id),
                                                            log_page=log_page, log_limit=log_limit))
        return entities.Job.from_pb(self.stub, res.job)

    @utils.catch_rpc_error
    def _remove_terminated_jobs(self, name_like: str, state: str):
        self.stub.CleanJobs(operations_pb2.CleanJobsRequest(name_like=name_like, state=state))

    @utils.catch_rpc_error
    def _consolidate(self,
                    job_name: str,
                    instance: Union[str, entities.VariableInstance],
                    layout: Union[str, entities.Layout],
                    records: Union[List[entities.RecordIdentifiers], None],
                    tags: Union[Dict[str, str], None],
                    from_time: Union[datetime, None],
                    to_time: Union[datetime, None],
                    collapse_on_record: Union[entities.Record, str, None],
                    execution_level: entities.ExecutionLevel):
        common = {
            "job_name":              job_name,
            "instance_id":           entities.get_id(instance),
            "layout_name":           entities.get_id(layout),
            "execution_level":       execution_level.value,
            "collapse_on_record_id": entities.get_id(collapse_on_record) if collapse_on_record is not None else "",
        }

        if records is not None:
            req = operations_pb2.ConsolidateRequest(
                **common, records=records_pb2.RecordIdList(ids=entities.get_ids(records)))
            if from_time is not None:
                warnings.warn("from_time is ignored if records is provided as argument to consolidate")
            if to_time is not None:
                warnings.warn("to_time is ignored if records is provided as argument to consolidate")
            if tags is not None:
                warnings.warn("tags is ignored if records is provided as argument to consolidate")
        else:
            from_time_pb = utils.pb_null_timestamp()
            if from_time is not None:
                from_time_pb.FromDatetime(from_time)
            to_time_pb = utils.pb_null_timestamp()
            if to_time is not None:
                to_time_pb.FromDatetime(to_time)
            req = operations_pb2.ConsolidateRequest(**common, filters=records_pb2.RecordFilters(
                tags=tags, from_time=from_time_pb, to_time=to_time_pb
            ))
        return self.get_job(self.stub.Consolidate(req).job_id)

get_job(job_id, log_page=0, log_limit=1000)

Get job by id. Logs are loaded by pages, because some big jobs have too many logs to fit in a gRPC response.

Source code in geocube/consolidater.py
23
24
25
26
27
28
def get_job(self, job_id: Union[str, entities.Job], log_page=0, log_limit=1000):
    """
    Get job by id.
    Logs are loaded by pages, because some big jobs have too many logs to fit in a gRPC response.
    """
    return self._get_job(job_id, log_page, log_limit)

job(name)

Get job by name. Shortcut for ListJobs(name)[0]. Only few logs are loaded.

Source code in geocube/consolidater.py
19
20
21
def job(self, name: str):
    """ Get job by name. Shortcut for ListJobs(name)[0]. Only few logs are loaded. """
    return self._job(name)

list_jobs(name_like='', page=0, limit=10)

List jobs by name name_like: pattern of the name. * and ? are supported to match all or any character.

Source code in geocube/consolidater.py
12
13
14
15
16
17
def list_jobs(self, name_like: str = "", page=0, limit=10):
    """
    List jobs by name
    name_like: pattern of the name. * and ? are supported to match all or any character.
    """
    return self._list_jobs(name_like, page, limit)

remove_terminated_jobs(name_like='', state='')

Remove all the jobs from the Geocube given a name pattern (by default, all terminated jobs) name_like: pattern of the name. * and ? are supported to match all or any character. state: state of the jobs to be removed.

Source code in geocube/consolidater.py
52
53
54
55
56
57
58
def remove_terminated_jobs(self, name_like: str = "", state: str = ""):
    """
    Remove all the jobs from the Geocube given a name pattern (by default, all terminated jobs)
    name_like: pattern of the name. * and ? are supported to match all or any character.
    state: state of the jobs to be removed.
    """
    return self._remove_terminated_jobs(name_like, state)

wait_job(job, wait_secs=15, timeout_secs=None, verbose=True) staticmethod

Wait for the job to finish or fail. If the execution level is step-by-step, it will automatically continue. If verbose=True, the last log is printed every time a state change is detected.

Source code in geocube/consolidater.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@staticmethod
def wait_job(job: entities.Job, wait_secs=15, timeout_secs=None, verbose=True):
    """
    Wait for the job to finish or fail.
    If the execution level is step-by-step, it will automatically continue.
    If verbose=True, the last log is printed every time a state change is detected.
    """
    prev_state = job.state
    while job.state not in ['DONE', 'FAILED', 'DONEBUTUNTIDY']:
        time.sleep(wait_secs)
        job.refresh(log_limit=1 if verbose else 0)
        if job.state != prev_state:
            prev_state = job.state
            if verbose:
                print(job.logs[-1])
        if job.waiting:
            job.next()
        if timeout_secs is not None:
            timeout_secs -= wait_secs
            if timeout_secs < 0:
                raise TimeoutError(f"job {job.name}: state={job.state}")